使用Pandas_UDF快速改造Pandas程式碼
1. Pandas_UDF介紹
PySpark和Pandas之間改進效能和互操作性的其核心思想是將Apache Arrow作為序列化格式,以減少PySpark和Pandas之間的開銷。
Pandas_UDF是在PySpark2.3中新引入的API,由Spark使用Arrow傳輸資料,使用Pandas處理資料。Pandas_UDF是使用關鍵字pandas_udf作為裝飾器或包裝函式來定義的,不需要額外的配置。目前,有兩種型別的Pandas_UDF,分別是Scalar(標量對映)和Grouped Map(分組對映)。
1.1 Scalar
Scalar Pandas UDF 用於向量化標量操作。常常與select和withColumn等函式一起使用。其中呼叫的Python函式需要使用pandas.Series作為輸入並返回一個具有相同長度的pandas.Series。具體執行流程是,Spark將列分成批,並將每個批作為資料的子集進行函式的呼叫,進而執行panda UDF,最後將結果連線在一起。
下面的示例展示如何建立一個scalar panda UDF,計算兩列的乘積:
import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType # 宣告函式並建立UDF def multiply_func(a, b): return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) x = pd.Series([1, 2, 3]) df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF df.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # |1| # |4| # |9| # +-------------------+
1.2 Grouped Map
Grouped map( 分組對映)panda udf與groupBy().apply()一起使用,後者實現了“split-apply-combine”模式。“split-apply-combine”包括三個步驟:
- 使用DataFrame.groupBy將資料分成多個組。
- 對每個分組應用一個函式。函式的輸入和輸出都是pandas.DataFrame。輸入資料包含每個組的所有行和列。
- 將結果合併到一個新的DataFrame中。
要使用groupBy().apply(),需要定義以下內容:
- 定義每個分組的Python計算函式,這裡可以使用pandas包或者Python自帶方法。
- 一個StructType物件或字串,它定義輸出DataFrame的格式,包括輸出特徵以及特徵型別。
需要注意的是, StructType物件中的Dataframe特徵順序需要與分組中的Python計算函式返回特徵順序保持一致。
此外, 在應用該函式之前,分組中的所有資料都會載入到記憶體,這可能導致記憶體不足丟擲異常。
下面的例子展示瞭如何使用groupby().apply()從組中的每個值中減去平均:
from pyspark.sql.functions import pandas_udf, PandasUDFType df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def subtract_mean(pdf): # pdf is a pandas.DataFrame v = pdf.v return pdf.assign(v=v - v.mean()) df.groupby("id").apply(subtract_mean).show() # +---+----+ # | id|v| # +---+----+ # |1|-0.5| # |1| 0.5| # |2|-3.0| # |2|-1.0| # |2| 4.0| # +---+----+
1.3 Grouped Aggregate
Grouped aggregate P anda UDF類似於Spark聚合函式。 Grouped aggregate P anda UDF常常與groupBy().agg()和pyspark.sql.window一起使用。它定義了來自一個或多個的聚合。級數到標量值,其中每個pandas.Series表示組或視窗中的一列。
需要注意的是,這種型別的UDF不支援部分聚合,組或視窗的所有資料都將載入到記憶體中。此外,目前只支援 Grouped aggregate Pandas UDFs 的無界視窗。
下面的例子展示瞭如何使用這種型別的UDF來計算groupBy和視窗操作的平均值:
from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql import Window df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) @pandas_udf("double", PandasUDFType.GROUPED_AGG) def mean_udf(v): return v.mean() df.groupby("id").agg(mean_udf(df['v'])).show() # +---+-----------+ # | id|mean_udf(v)| # +---+-----------+ # |1|1.5| # |2|6.0| # +---+-----------+ w = Window \ .partitionBy('id') \ .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # +---+----+------+ # | id|v|mean_v| # +---+----+------+ # |1| 1.0|1.5| # |1| 2.0|1.5| # |2| 3.0|6.0| # |2| 5.0|6.0| # |2|10.0|6.0| # +---+----+------+
2. 快速使用Pandas_UDF
需要注意的是schema變數裡的欄位名稱為pandas_dfs() 返回的spark dataframe中的欄位,欄位對應的格式為符合spark的格式。
這裡,由於pandas_dfs()功能只是選擇若干特徵,所以沒有涉及到欄位變化,具體的欄位格式在進入pandas_dfs()之前已通過printSchema()列印。如果在pandas_dfs()中使用了pandas的reset_index()方法,且儲存index,那麼需要在schema變數中第一個欄位處新增'index'欄位及格式(下段程式碼註釋內容)。
import pandas as pd from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate() df3 = spark.createDataFrame( [(18862669710, '/未知型別', 'IM傳檔案', 'QQ接收檔案', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582), (18862669710, '/未知型別', 'IM傳檔案', 'QQ接收檔案', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582), (18862228190, '/移動終端', '移動終端應用', '移動騰訊視訊', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582), (18862669710, '/未知型別', '訪問網站', '搜尋引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)], ('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class')) def compute(x): result = x[['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class', 'start_time', 'end_time']] return result schema = StructType([ # StructField("index", DoubleType()), StructField("online_account", LongType()), StructField("terminal_type", StringType()), StructField("action_type", StringType()), StructField("app", StringType()), StructField("access_seconds", DoubleType()), StructField("datetime", StringType()), StructField("outid", LongType()), StructField("class", LongType()), StructField("end_time", TimestampType()), StructField("start_time", TimestampType()), ]) @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) def g(df): print('ok') mid = df.groupby(['online_account']).apply(lambda x: compute(x)) result = pd.DataFrame(mid) # result.reset_index(inplace=True, drop=False) return result df3 = df3.withColumn("end_time", df3['datetime'].cast(TimestampType())) df3 = df3.withColumn('end_time_convert_seconds', df3['end_time'].cast('long').cast('int')) time_diff = df3.end_time_convert_seconds - df3.access_seconds df3 = df3.withColumn('start_time', time_diff.cast('int').cast(TimestampType())) df3 = df3.drop('end_time_convert_seconds') df3.printSchema() aa = df3.groupby(['online_account']).apply(g) aa.show()
3. 優化Pandas_UDF程式碼
在上一小節中,我們是通過Spark方法進行特徵的處理,然後對處理好的資料應用@pandas_udf裝飾器呼叫自定義函式。但這樣看起來有些凌亂,因此可以把這些Spark操作都寫入pandas_udf方法中。
注意:上小節中存在一個欄位沒有正確對應的bug,而pandas_udf方法返回的特徵順序要與schema中的欄位順序保持一致!
import pandas as pd from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate() df3 = spark.createDataFrame( [(18862669710, '/未知型別', 'IM傳檔案', 'QQ接收檔案', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582), (18862669710, '/未知型別', 'IM傳檔案', 'QQ接收檔案', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582), (18862228190, '/移動終端', '移動終端應用', '移動騰訊視訊', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582), (18862669710, '/未知型別', '訪問網站', '搜尋引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)], ('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class')) def compute(x): x['end_time'] = pd.to_datetime(x['datetime'], errors='coerce', format='%Y-%m-%d') x['end_time_convert_seconds'] = pd.to_timedelta(x['end_time']).dt.total_seconds().astype(int) x['start_time'] = pd.to_datetime(x['end_time_convert_seconds'] - x['access_seconds'], unit='s') x = x.sort_values(by=['start_time'], ascending=True) result = x[['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class', 'start_time', 'end_time']] return result schema = StructType([ StructField("online_account", LongType()), StructField("terminal_type", StringType()), StructField("action_type", StringType()), StructField("app", StringType()), StructField("access_seconds", DoubleType()), StructField("datetime", StringType()), StructField("outid", LongType()), StructField("class", LongType()), StructField("start_time", TimestampType()), StructField("end_time", TimestampType()), ]) @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) def g(df): print('ok') mid = df.groupby(['online_account']).apply(lambda x: compute(x)) result = pd.DataFrame(mid) return result df3.printSchema() aa = df3.groupby(['online_account']).apply(g) aa.show()
4. Pandas_UDF與toPandas的區別
- @pandas_udf 建立一個向量化的使用者定義函式(UDF),利用了panda的向量化特性,是udf的一種更快的替代方案,因此適用於分散式資料集。
- toPandas將分散式spark資料集轉換為pandas資料集,對pandas資料集進行本地化,並且所有資料都駐留在驅動程式記憶體中,因此此方法僅在預期生成的pandas DataFrame較小的情況下使用。
換句話說,@pandas_udf使用panda API來處理分散式資料集,而toPandas()將分散式資料集轉換為本地資料,然後使用pandas進行處理。