Spark SQL大資料處理並寫入Elasticsearch
通過SparkSQL匯入的資料可以來自MySQL資料庫、Json資料、Csv資料等,通過load這些資料可以對其做一系列計算
下面通過程式程式碼來詳細檢視SparkSQL匯入資料並寫入到ES中:
資料集: ofollow,noindex">北京市PM2.5資料
Spark版本:2.3.2
Python版本:3.5.2
mysql-connector-java-8.0.11 下載
ElasticSearch:6.4.1
Kibana:6.4.1
elasticsearch-spark-20_2.11-6.4.1.jar 下載
具體程式碼:
1 # coding: utf-8 2 import sys 3 import os 4 5 pre_current_dir = os.path.dirname(os.getcwd()) 6 sys.path.append(pre_current_dir) 7 from pyspark.sql import SparkSession 8 from pyspark.sql.types import * 9 from pyspark.sql.functions import udf 10 from settings import ES_CONF 11 12 current_dir = os.path.dirname(os.path.realpath(__file__)) 13 14 spark = SparkSession.builder.appName("weather_result").getOrCreate() 15 16 17 def get_health_level(value): 18""" 19PM2.5對應健康級別 20:param value: 21:return: 22""" 23if 0 <= value <= 50: 24return "Very Good" 25elif 50 < value <= 100: 26return "Good" 27elif 100 < value <= 150: 28return "Unhealthy for Sensi" 29elif value <= 200: 30return "Unhealthy" 31elif 200 < value <= 300: 32return "Very Unhealthy" 33elif 300 < value <= 500: 34return "Hazardous" 35elif value > 500: 36return "Extreme danger" 37else: 38return None 39 40 41 def get_weather_result(): 42""" 43獲取Spark SQL分析後的資料 44:return: 45""" 46# load所需欄位的資料到DF 47df_2017 = spark.read.format("csv") \ 48.option("header", "true") \ 49.option("inferSchema", "true") \ 50.load("file://{}/data/Beijing2017_PM25.csv".format(current_dir)) \ 51.select("Year", "Month", "Day", "Hour", "Value", "QC Name") 52 53# 檢視Schema 54df_2017.printSchema() 55 56# 通過udf將字元型health_level轉換為column 57level_function_udf = udf(get_health_level, StringType()) 58 59# 新建列healthy_level 並healthy_level分組 60group_2017 = df_2017.withColumn( 61"healthy_level", level_function_udf(df_2017['Value']) 62).groupBy("healthy_level").count() 63 64# 新建列days和percentage 並計算它們對應的值 65result_2017 = group_2017.select("healthy_level", "count") \ 66.withColumn("days", group_2017['count'] / 24) \ 67.withColumn("percentage", group_2017['count'] / df_2017.count()) 68result_2017.show() 69 70return result_2017 71 72 73 def write_result_es(): 74""" 75將SparkSQL計算結果寫入到ES 76:return: 77""" 78result_2017 = get_weather_result() 79# ES_CONF配置 ES的node和index 80result_2017.write.format("org.elasticsearch.spark.sql") \ 81.option("es.nodes", "{}".format(ES_CONF['ELASTIC_HOST'])) \ 82.mode("overwrite") \ 83.save("{}/pm_value".format(ES_CONF['WEATHER_INDEX_NAME'])) 84 85 86 write_result_es() 87 spark.stop() View Code
將mysql-connector-java-8.0.11和elasticsearch-spark-20_2.11-6.4.1.jar放到Spark的jars目錄下,提交spark任務即可。
注意:
(1) 如果提示:ClassNotFoundException Failed to find data source: org.elasticsearch.spark.sql.,則表示spark沒有發現jar包,此時需重新編譯pyspark:
cd /opt/spark-2.3.2-bin-hadoop2.7/python python3 setup.py sdist pip install dist/*.tar.gz
(2) 如果提示:Multiple ES-Hadoop versions detected in the classpath; please use only one ,
則表示ES-Hadoop jar包有多餘的,
可能既有elasticsearch-hadoop,又有elasticsearch-spark,此時刪除多餘的jar包,重新編譯pyspark 即可
執行效果:
更多原始碼請關注我的 github , https://github.com/a342058040/Spark-for-Python ,Spark相關技術全程用python實現,持續更新