Spark實戰(二)學習UDF
這是崔斯特的第六十七篇原創文章
在開始正式資料處理之前,我覺得有必要去學習理解下UDF。
UDF
UDF全稱 User-Defined Functions
,使用者自定義函式,是Spark+SQL/">Spark SQL的一項功能,用於定義新的基於列的函式,這些函式擴充套件了Spark SQL的DSL用於轉換資料集的詞彙表。
我在databricks上找到一個比較簡單理解的入門栗子:
Register the function as a UDF
val squared = (s: Int) => { s * s } spark.udf.register("square", squared)
Call the UDF in Spark SQL
spark.range(1, 20).registerTempTable("test")
%sql select id, square(id) as id_squared from test
我理解就是先定義一個函式 squared
,返回輸入數字的平方,然後register,並繫結 square
方法名為 square
,然後就在Spark SQL中直接使用 square
方法。
例項一:溫度轉化
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf object ScalaUDFExample{ def main(args: Array[String]) { val conf= new SparkConf().setAppName("Scala UDF Example") val spark= SparkSession.builder().enableHiveSupport().config(conf).getOrCreate() val ds = spark.read.json("temperatures.json") ds.createOrReplaceTempView("citytemps") // Register the UDF with our SparkSession spark.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0)) spark.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show() } }
我們將定義一個 UDF 來將以下 JSON 資料中的溫度從攝氏度(degrees Celsius)轉換為華氏度(degrees Fahrenheit):
{"city":"St. John's","avgHigh":8.7,"avgLow":0.6} {"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9} {"city":"Halifax","avgHigh":11.0,"avgLow":1.6} {"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5} {"city":"Quebec","avgHigh":9.0,"avgLow":-1.0} {"city":"Montreal","avgHigh":11.1,"avgLow":1.4} ...
例項二:時間轉化
case class Purchase(customer_id:Int, purchase_id:Int, date:String, time:String, tz:String, amount:Double) val x = sc.parallelize(Array( Purchase(123, 234, "2007-12-12", "20:50", "UTC", 500.99), Purchase(123, 247, "2007-12-12", "15:30", "PST", 300.22), Purchase(189, 254, "2007-12-13", "00:50", "EST", 122.19), Purchase(187, 299, "2007-12-12", "07:30", "UTC", 524.37) )) val df = sqlContext.createDataFrame(x) df.registerTempTable("df")
自定義函式
def makeDT(date: String, time: String, tz: String) = s"$date$time$tz" sqlContext.udf.register("makeDt", makeDT(_:String,_:String,_:String)) // Now we can use our function directly in SparkSQL. sqlContext.sql("SELECT amount, makeDt(date, time, tz) from df").take(2) // but not outside df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2) // fails
如果想要在SQL外面使用,必須通過 spark.sql.function.udf
來建立UDF
import org.apache.spark.sql.functions.udf val makeDt = udf(makeDT(_:String,_:String,_:String)) // now this works df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2)
實踐操作
寫一個UDF來將一些Int數字分類
val formatDistribution = (view: Int) => { if (view < 10) { "<10" } else if (view <= 100) { "10~100" } else if (view <= 1000) { "100~1K" } else if (view <= 10000) { "1K~10K" } else if (view <= 100000) { "10K~100K" } else { ">100K" } }
註冊:
session.udf.register("formatDistribution", UDF.formatDistribution)
SQL:
session.sql("select user_id, formatDistribution(variance_digg_count) as variance from video")
寫到這裡,再回顧UDF,我感覺這就像是去為了方便做一個分類轉化等操作,和Python裡面的函式一樣,只不過這裡的UDF一般特指Spark SQL裡面使用的函式。然後發現這裡和SQL中的自定義函式挺像的:
CREATE FUNCTION [函式所有者.]<函式名稱> ( -- 新增函式所需的引數,可以沒有引數 [<@param1> <引數型別>] [,<@param1> <引數型別>]… ) RETURNS TABLE AS RETURN ( -- 查詢返回的SQL語句 SELECT查詢語句 )
/* * 建立內聯表值函式,查詢交易總額大於1W的開戶人個人資訊 */ create function getCustInfo() returns @CustInfo table--返回table型別 ( --賬戶ID CustID int, --帳戶名稱 CustName varchar(20) not null, --身份證號 IDCard varchar(18), --電話 TelePhone varchar(13) not null, --地址 Address varchar(50) default('地址不詳') ) as begin --為table表賦值 insert into @CustInfo select CustID,CustName,IDCard,TelePhone,Address from AccountInfo where CustID in (select CustID from CardInfo where CardID in (select CardID from TransInfo group by CardID,transID,TransType,TransMoney,TransDate having sum(TransMoney)>10000)) return end go -- 呼叫內聯表值函式 select * from getCustInfo() go
好像有異曲同工之妙~