一點資訊 SparkSQL 查詢引擎實踐
“本文分析SparkSQL ThriftServer工作原理,修改Spark SQL原始碼並實現了SQL 查詢進度的計算,最後展示了一點資訊基於Presto+SparkSQL+Hive的Web查詢引擎 ”
01
—
問題背景:SparkSQL ThriftServer 無法獲取查詢進度
目前公司的分散式Adhoc查詢有以下幾種:Presto,Hive,SparkSQL。公司外部開源系統裡比較知名的有Redash,Zeppelin等查詢工具。
以上查詢工具均支援:Hive、SparkSQL、Mysql等查詢資料來源,SQL格式化,結果進行排序等。Redash還支援語法動態提示等功能,非常的方便。
然而很遺憾的是: 上述開源工具,無論是對接SparkSQL還是對接HiveServer2都有一個致命的問題就是—— 無法知曉查詢的進度 ,甚至是像Presto這種本身帶有進度的查詢引擎,在接入Redash等系統後,也無法知曉當前查詢的時間、進度、剩餘時間等。
查詢進度重要嗎?
從人機互動的角度看,快查詢進度條意義不大,但是Hive等MR查詢 長時間的盲目等待是難以接受的(Hive命令列提供大致的MR進度展示)。 如MysqlWorkBench和Navicat Mysql等工具
-
Mysql查詢大都是毫秒級別返回,不顯示進度條。
-
匯入匯出資料庫時以上工具都有進度條,即長時間作業需要進度條。
我們的目標是 :深入分析SparkSQL ThriftServer的工作過程,並通過修改原始碼等方式來獲取執行的狀態,讓Presto查詢、Hive查詢、SparkSQL查詢均能感知到進度。本文重點介紹了SparkSQL查詢進度的實現。
02
—
SparkSQL ThriftServer工作原理剖析
圖1: SparkSQL ThriftServer工作的5個步驟
從上圖可知,SparkSQL ThriftServer的啟動過程分 5個步驟。 在步驟3 ExecuteStatement時,SparkSQL 為每一個查詢建立Spark的Job,並隨機生成UUID。
一點大資料團隊修改SparkSQL原始碼實現定製化的JobID生成,並跟蹤每個SparkJob的執行過程實現SQL執行進度的計算。
1:SparkSQL初始化
圖2: SparkSQL ThriftServer 初始化
如上圖所知,SparkSQL ThriftServer初始化包含以下幾個步驟:
-
Java Main函式啟動,初始化HiveServer2,建立 ThriftBinaryCLIService
-
建立Hive模組內的 ThriftBinaryCLIService 並傳入 SparkSQL模組內的SparkSqlCliService
-
建立SparkSQLSessionManager 通過反射方式將CliService中的SessionManager設定為自身
-
使用反射是 因為Hive程式碼中的CLiService的SessonManager為 Private方法
-
Thrift BinaryService啟動,並根據配置的ThriftSever埠、地址等資訊建立TThreadPoolServer。
2:建立連結OpenSession:
客戶端(Beeline CLI) 向伺服器ThriftBinaryServer傳送OpenSession的Thrift請求:
-
SparkSQLCLIService 呼叫初始化過程中建立的SessionManager 建立Session
-
此時根據配置的不同為Thrift的會話繫結一個SparkSession
-
每個Session設定自己的SessionManager和OperationManager
-
SessonManager用於管理整個Hive的查詢週期,比如建立連結、關閉連結等。
-
OperationManager 完成具體任務:用於執行具體的任務邏輯比如查看錶資訊、檢視元資料
-
SparkSQL重寫OperationManager的 newExecuteStatementOperation()方法, 轉換為Spark作業
3:發起SQL查詢ExecuteStatement(重點)
圖3: Beeline 客戶端發起查詢SQL請求到 SparkSQL Server
-
客戶端向伺服器ThriftBinaryServer傳送ExecuteStatement的Thrift請求,伺服器接到請求後獲取當且Thrift會話中的 HiveSession ,並呼叫 ExecuteStatement 方法。
-
繼續呼叫 executeStatementInternal 函式。此方法呼叫 步驟2:建立鏈 接OpenSession : 中初始化好的 OperationManager執行 newExecuteStatementOperation ()
-
HiveSession繼續呼叫重寫的 runInternal ()方法,將Hive請求進一步呼叫 execute() 來完成Spark的計算邏輯。
-
Spark收到查詢請求後,在 SparkExecuteStatementOperation 中為每一個查詢語句隨機生成UUID作為JobId:
-
此處是關鍵點即Spark通過UUID來識別每一個的Job,並在Spark UI顯示
-
而UUID和SQL語句間沒有直接的關聯
-
Spark執行SparkContext.Sql() 函式直接計算獲取結果的DataFrame,結果計算完成將statement標記為完成狀態。返回給客戶端 OperationHandle 控制代碼作為查詢結果的依據。
4: 客戶端獲取結果FetchResults
客戶端檢查Opeartion的狀態,當發現是FINISHED狀態時候,請求結果的元資料Meta資訊和結果內容資訊,分別由客戶端的 FetchResults 請求和 GetResultSetMetadata 請求
-
Spark 將DataFrame的每一行轉換為Thrift結果的Row
-
Spark 將DataFrame的 DataFrame的行列資訊轉換為 Hive的元資料資訊。
5:關閉並釋放連結
03
—
設計原理與程式碼修改
步驟1: 一點查詢客戶端根據SQL生成UUID並儲存對應關係:
-
我們根據Hive JDBC介面,封裝了自己的查詢客戶端,並生成唯一的查詢ID
-
QueryID 樣例:
-
20181014_101745_57dbf79bc1e1f27f82911b00b91ddcde
-
客戶端通過 jdbc:hive2://IP:Port/dw? shark.sparksql.queryid=$QueryID 這樣的連線來訪問後端的ThriftServer。
步驟2: 伺服器接受SQL和QueryID,將Spark的JobID設定為QueryID:
圖4: 如何實現HiveJDBC客戶端傳遞UUID到伺服器端
-
Spark ThriftServer 建立連線請求的時候,根據Url中的引數資訊,HiveSession在建立連結的時候解析 JDBC:hive2 url引數 並且將引數 “shark.sparksql.queryid“ , 以Key-Value 的形式存放於 HiveConf
-
在OperationManager中,當建立ExecuteStatementOperation時候,讀取所有HiveConf中的配置資訊,寫入到SparkContxt的配置中。
-
在 執行 ExecuteStatementOperation 時,取上下文 “shark.sparksql.quer yid" 對應的值,並將其設定為JobID, 此ID在SparkUI中可以查詢到
-
statementId=sqlContext.getConf("shark.sparksql.queryid", statementId)
修改後的SparkSQL執行頁面中JobID已經生效
步驟3: 客戶端讀取 Spark RESTful API 獲取查詢進度:
我們採用Spark的Restful API來獲取每一個查詢ID的進展情況
Spark RESTful API的訪問地址為
http://HOST:4041/api/v1/applications
-
Step1:查詢當前Spark的ApplicationID:http://HOST:4041/api/v1/applications
-
Step2:查詢當前Spark的所有Job列表:http://HOST:4041/api/v1/applications/application_id/jobs application_id為 Step_1中 取到的application_id
-
Step3: 根據QueryID,可以獲取到當前QueryID下所有任務數(numTasks),已經完成的任務數(numCompletedTasks),跳過的Task數(numSkippedTasks)
-
計算公式為:完成任務數+跳過任務數)/(總任務數)
圖6: Spark 的Restful API獲取任務狀態
步驟4:效果展示
通過以上對SparkSQL的改進,我們的Shark 一點大資料查詢系統可以支援SparkSQL的進度查詢。
圖7: 藍色為SparkSQL 查詢進度條 當前進度9%
另外我們通過SQL語法分析工具、LeaderLatch(ZK Leader選擇)等技術實現了SQL格式化、高可用的Presto等,使得我們的Shark查詢引擎達到了生產系統的要求。
圖8: 一點大資料查詢系統與Redash、Zeppelin對比分析
04
—
未來規劃
1: SparkSQL ThriftServer的分散式化:
-
智慧的客戶端根據查詢型別、資料分割槽大小提交至不同的SparkSQL ThriftServer。
-
對資料取樣、小查詢等建立單獨的SparkSQL查詢後端服務,提高查詢的響應速度對大資料量、高記憶體佔用查詢分配大記憶體的Executor保障查詢的穩定性。
-
統一的Web客戶端維護查詢ID和後端SparkThriftServer的關係,支援失敗重試、同時提交等功能。
2 : 使用者無感知的 智慧查詢引擎選擇
-
綜合利用Presto、SparkSQL、Hive等查詢系統。短查詢優先Presto,長查詢優先Hive
-
統一查詢語法,消除Presto語法和Hive語法直接的不同。
-
Presto和Hive的UDF不同,需要建立統一的UDF管理系統實現語法和程式碼的統一管理。