pyspark工具機器學習(自然語言處理和推薦系統)2資料處理
本章介紹資料處理。資料處理是執行Machine Learning所需的關鍵步驟,因為我們需要清理,過濾,合併和轉換我們的所需資料形式。
快速入門
- 讀取
>>> from pyspark.sql import SparkSession >>> spark=SparkSession.builder.appName('data_processing').getOrCreate() >>> df=spark.read.csv('sample_data.csv',inferSchema=True, header=True) >>> df.columns ['ratings', 'age', 'experience', 'family', 'mobile'] >>> len(df.columns) 5 >>> df.count() 33 >>> df.printSchema() root |-- ratings: integer (nullable = true) |-- age: integer (nullable = true) |-- experience: double (nullable = true) |-- family: integer (nullable = true) |-- mobile: string (nullable = true) >>> df.show(3) +-------+---+----------+------+-------+ |ratings|age|experience|family| mobile| +-------+---+----------+------+-------+ |3| 32|9.0|3|Vivo| |3| 27|13.0|3|Apple| |4| 22|2.5|0|Samsung| +-------+---+----------+------+-------+ only showing top 3 rows >>> df.select('age','mobile').show(5) +---+-------+ |age| mobile| +---+-------+ | 32|Vivo| | 27|Apple| | 22|Samsung| | 37|Apple| | 27|MI| +---+-------+ only showing top 5 rows >>> df.describe().show() +-------+------------------+------------------+------------------+------------------+------+ |summary|ratings|age|experience|family|mobile| +-------+------------------+------------------+------------------+------------------+------+ |count|33|33|33|33|33| |mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|null| | stddev|1.1188806636071336|6.18527087180309| 6.770731351213326|1.8448330794164254|null| |min|1|22|2.5|0| Apple| |max|5|42|23.0|5|Vivo| +-------+------------------+------------------+------------------+------------------+------+
- 新增列
>>> df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False) +-------+---+----------+------+-------+----------------+ |ratings|age|experience|family|mobile |age_after_10_yrs| +-------+---+----------+------+-------+----------------+ |3|32 |9.0|3|Vivo|42| |3|27 |13.0|3|Apple|37| |4|22 |2.5|0|Samsung|32| |4|37 |16.5|4|Apple|47| |5|27 |9.0|1|MI|37| |4|27 |9.0|0|Oppo|37| |5|37 |23.0|5|Vivo|47| |5|37 |23.0|5|Samsung|47| |3|22 |2.5|0|Apple|32| |3|27 |6.0|0|MI|37| +-------+---+----------+------+-------+----------------+ only showing top 10 rows >>> from pyspark.sql.types import StringType,DoubleType >>> df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False) +-------+---+----------+------+-------+----------+ |ratings|age|experience|family|mobile |age_double| +-------+---+----------+------+-------+----------+ |3|32 |9.0|3|Vivo|32.0| |3|27 |13.0|3|Apple|27.0| |4|22 |2.5|0|Samsung|22.0| |4|37 |16.5|4|Apple|37.0| |5|27 |9.0|1|MI|27.0| |4|27 |9.0|0|Oppo|27.0| |5|37 |23.0|5|Vivo|37.0| |5|37 |23.0|5|Samsung|37.0| |3|22 |2.5|0|Apple|22.0| |3|27 |6.0|0|MI|27.0| +-------+---+----------+------+-------+----------+ only showing top 10 rows
上面的False表示超過20個字元也不會截斷。
- 資料過濾
>>> df.filter(df['mobile']=='Vivo').show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ |3| 32|9.0|3|Vivo| |5| 37|23.0|5|Vivo| |4| 37|6.0|0|Vivo| |5| 37|13.0|1|Vivo| |4| 37|6.0|0|Vivo| +-------+---+----------+------+------+ >>> df.filter(df['mobile']=='Vivo').select('age','ratings', 'mobile').show() +---+-------+------+ |age|ratings|mobile| +---+-------+------+ | 32|3|Vivo| | 37|5|Vivo| | 37|4|Vivo| | 37|5|Vivo| | 37|4|Vivo| +---+-------+------+ >>> df.filter(df['mobile']=='Vivo').filter(df['experience']>10).show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ |5| 37|23.0|5|Vivo| |5| 37|13.0|1|Vivo| +-------+---+----------+------+------+ >>> df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ |5| 37|23.0|5|Vivo| |5| 37|13.0|1|Vivo| +-------+---+----------+------+------+
- 唯一值
>>> df.select('mobile').distinct().show() +-------+ | mobile| +-------+ |MI| |Oppo| |Samsung| |Vivo| |Apple| +-------+ >>> df.select('mobile').distinct().count() 5
- 分組和排序
>>> df.groupBy('mobile').count().show(5,False) +-------+-----+ |mobile |count| +-------+-----+ |MI|8| |Oppo|7| |Samsung|6| |Vivo|5| |Apple|7| +-------+-----+ >>> df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False) +-------+-----+ |mobile |count| +-------+-----+ |MI|8| |Oppo|7| |Apple|7| |Samsung|6| |Vivo|5| +-------+-----+ >>> df.groupBy('mobile').mean().show(5,False) +-------+------------------+------------------+------------------+------------------+ |mobile |avg(ratings)|avg(age)|avg(experience)|avg(family)| +-------+------------------+------------------+------------------+------------------+ |MI|3.5|30.125|10.1875|1.375| |Oppo|2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286| |Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333| |Vivo|4.2|36.0|11.4|1.8| |Apple|3.4285714285714284|30.571428571428573|11.0|2.7142857142857144| +-------+------------------+------------------+------------------+------------------+ >>> df.groupBy('mobile').sum().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)| +-------+------------+--------+---------------+-----------+ |MI|28|241|81.5|11| |Oppo|20|199|72.5|10| |Samsung|25|172|52.0|11| |Vivo|21|180|57.0|9| |Apple|24|214|77.0|19| +-------+------------+--------+---------------+-----------+ >>> df.groupBy('mobile').max().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |max(ratings)|max(age)|max(experience)|max(family)| +-------+------------+--------+---------------+-----------+ |MI|5|42|23.0|5| |Oppo|4|42|23.0|2| |Samsung|5|37|23.0|5| |Vivo|5|37|23.0|5| |Apple|4|37|16.5|5| +-------+------------+--------+---------------+-----------+ >>> df.groupBy('mobile').max().show(3,False) +-------+------------+--------+---------------+-----------+ |mobile |max(ratings)|max(age)|max(experience)|max(family)| +-------+------------+--------+---------------+-----------+ |MI|5|42|23.0|5| |Oppo|4|42|23.0|2| |Samsung|5|37|23.0|5| +-------+------------+--------+---------------+-----------+ only showing top 3 rows >>> df.groupBy('mobile').min().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |min(ratings)|min(age)|min(experience)|min(family)| +-------+------------+--------+---------------+-----------+ |MI|1|27|2.5|0| |Oppo|2|22|6.0|0| |Samsung|2|22|2.5|0| |Vivo|3|32|6.0|0| |Apple|3|22|2.5|0| +-------+------------+--------+---------------+-----------+
- 聚合
>>> df.groupBy('mobile').agg({'experience':'sum'}).show(5,False) +-------+---------------+ |mobile |sum(experience)| +-------+---------------+ |MI|81.5| |Oppo|72.5| |Samsung|52.0| |Vivo|57.0| |Apple|77.0| +-------+---------------+