通過一個方法(aggregate)理解spark的rdd
RDD(Resilient Distributed Dataset,彈性分散式資料集)是Spark的基本抽象和核心物件。它代表了一個具有容錯性的,可以並行運算的元素集合。有兩種途徑建立RDD:將Driver Program中的集合型別轉換成RDD,或者是引用外部儲存系統中的資料集,例如檔案系統、HDFS、HBase、資料庫等。
在實際的應用中,可以認識到Spark是一個獨立的計算引擎。先從外部載入資料,然後執行分散式的並行運算,最後將運算結果進行顯示、儲存、或發給其他應用。其中執行分散式的並行運算位於核心的位置,理解RDD也就很關鍵,這篇文章想通過一個例子來講述RDD。
這篇文章假設你已經搭建好了一個多節點的Spark叢集,並理解Spark Driver Progame和Spark叢集的區別。
一個簡單的問題
先考慮一個簡單的問題:假設有一個數組[1, 2, 3, 4, 5, 6],要計算出兩個值:陣列元素個數、陣列元素之和。假設將結果儲存為一個元組,初始值為(0,0),那麼只要像下面這樣遍歷求和就可以了。其邏輯如下:
迴圈次數 | 輸入值 | 結果 |
---|---|---|
初始 | - | (0,0) |
第1次遍歷 | (0,0), 1 | (1,1) |
第2次遍歷 | (1,1), 2 | (3,2) |
第3次遍歷 | (3,2), 3 | (6,3) |
第4次遍歷 | (6,3), 4 | (10,4) |
第5次遍歷 | (10,4), 5 | (15,5) |
第6次遍歷 | (15,5), 6 | (21,6) |
每一次迴圈的輸入值是:上一次迴圈的結果 和 本次迴圈到的陣列元素。其運算的邏輯為:對元組的第1個元素進行結果累加,第2個元素進行+1。
如果這個陣列有1000萬個元素,這樣算可能很慢,我們可能想到的一個辦法是什麼?將陣列分成兩個,[1,2,3] 和 [4,5,6] 然後分別進行運算,再將結果累加起來。和上面這種方式不同的是,多了一個步驟:彙總兩個拆分後陣列的運算結果。
迴圈次數 | 輸入值 | 結果 |
---|---|---|
初始 | - | (0,0) |
第1次遍歷 | (0,0), 1 | (1,1) |
第2次遍歷 | (1,1), 2 | (3,2) |
第3次遍歷 | (3,2), 3 | (6,3) |
迴圈次數 | 輸入值 | 結果 |
---|---|---|
初始 | - | (0,0) |
第1次遍歷 | (0,0), 4 | (4,1) |
第2次遍歷 | (4,1), 5 | (9,2) |
第3次遍歷 | (9,2), 6 | (15,3) |
再對兩個陣列進行分別運算後,為了得到最終結果,還要進行一次合併結果的操作:
迴圈次數 | 輸入值 | 結果 |
---|---|---|
初始 | - | (0,0) |
第1次遍歷 | (0,0), (6,3) | (6,3) |
第2次遍歷 | (6,3), (15,3) | (21,6) |
顯然,這是一個CPU密集型(CPU-Bound)運算,對於多核CPU而言,比如說4核CPU,將陣列分為4個,然後在本機進行運算是完全可以的。
它依然存在兩個問題:
- 沒有統一的程式設計模型,我們需要手動地去拆分資料,分別運算,再彙總結果。
- 當運算量超出單臺計算機的能力,需要將資料和運算分佈到多臺計算機上時,則異常複雜。
此時,就是Spark出來救場的時候了。可以看到,從原理上來講,Spark所解決的事情很簡單;從實現過程上來講,則非常複雜。
RDD的aggregate方法
上面計算資料元素個數和之和的例子,實際上來自Spark/Rdd/aggregate方法的官方文件。我提前將它引出,只是為了說明aggregate方法解決一個什麼問題。
aggregate接受3個引數:
- 第1個引數是一個元組,儲存元素個數和元素之和的初始值,顯然,為(0,0);
- 第2個引數是一個函式(或Lambda表示式),表示對每個拆分後的陣列執行的操作;
- 第3個引數也是一個函式(或Lamda表示式),表示對拆分後陣列的運算結果執行的操作。
這裡先看第2個引數的寫法:
def seqOp(t, x): #print "seqOp: ", [t, x], [t[0] + x, t[1] + 1] return (t[0] + x, t[1] + 1)
其中t表示每次上一次迴圈的結果,由初始元組(0,0)演化而來,x表示初識陣列的值。官方文件這裡一個最大的問題是將變數名命名成了x,y,像下面這樣:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
這樣很容易造成一個誤會,就是以為兩個引數x, y,分別代表了元組的第一個值和第二個值,因為x,y成對出現時,一般都以為它們存在關聯。
再看第3個引數的寫法:
def comOp(x, y): print "comOp: ", [x, y],[x[0] + y[0], x[1]+y[1]] return (x[0] + y[0], x[1] + y[1])
這裡的x和y均為元組,x為之前所有拆分陣列的運算結果累加,y為當前正在處理的陣列運算結果。顯然,只需要將它們分別累加就可以了:return (x[0] + y[0], x[1] + y[1])。
Spark的叢集模式和部署模式
Spark有好幾種叢集模式,包括 local、standalone、yarn、mesos和Kubernetes;有兩種部署模式,client和cluster。上面程式碼的執行方式,在不同的部署模式上會有所不同。為了更好地瞭解這個例子,我們先採用 local[1]/client來執行這段程式碼:
local[1]/client
當使用local[1]時,表示只使用1個CPU核心,這種方式在生產環境並不常用。它在本機執行,並只使用一個CPU核心,因此實際上沒有並行運算。
from pyspark import SparkContext, SparkConf # $SPARK_HOME/bin/spark-submit /data/pyjobs/test/aggregate.py conf = SparkConf().setAppName("test_aggregate").setMaster("local[1]") sc = SparkContext(conf=conf) def seqOp(x, y): print "seqOp: ", [x, y], [x[0] + y, x[1] + 1] return x[0] + y, x[1] + 1 def comOp(x, y): print "comOp: ", [x, y],[x[0] + y[0], x[1]+y[1]] return x[0] + y[0], x[1] + y[1] c = sc.parallelize([1, 2, 3, 4, 5, 6]).aggregate((0, 0), seqOp, comOp) print c
輸出如下,可以看到這個結果和我們第一節中表格中的例子完全相同,並且注意到只執行了一次comOp:
seqOp:[(0, 0), 1] [1, 1] seqOp:[(1, 1), 2] [3, 2] seqOp:[(3, 2), 3] [6, 3] seqOp:[(6, 3), 4] [10, 4] seqOp:[(10, 4), 5] [15, 5] seqOp:[(15, 5), 6] [21, 6] comOp:[(0, 0), (21, 6)] [21, 6] (21, 6)
local[2]/client
程式碼只需要修改一處,意思是採用2個CPU核心:
conf = SparkConf().setAppName("test_aggregate").setMaster("local[2]")
運算的結果如下:
seqOp:[(0, 0), 1] [1, 1] seqOp:[(1, 1), 2] [3, 2] seqOp:[(3, 2), 3] [6, 3] seqOp:[(0, 0), 4] [4, 1] seqOp:[(4, 1), 5] [9, 2] seqOp:[(9, 2), 6] [15, 3] comOp:[(0, 0), (6, 3)] [6, 3] comOp:[(6, 3), (15, 3)] [21, 6] (21, 6)
可以看到這段程式碼的seqOp分為了兩部分,一部分從[(0, 0), 1]開始,一部分從[(0, 0), 4]開始。同時,執行了兩次comOp,因為陣列被分為了兩個。同時,可以看到這個輸出和第一節下面的表格完全一致。
類似地,可以改為local[3],結果如下:
seqOp:[(0, 0), 1] [1, 1] seqOp:[(1, 1), 2] [3, 2] seqOp:[(0, 0), 3] [3, 1] seqOp:[(3, 1), 4] [7, 2] seqOp:[(0, 0), 5] [5, 1] seqOp:[(5, 1), 6] [11, 2] combOp:[(0, 0), (3, 2)] [3, 2] combOp:[(3, 2), (7, 2)] [10, 4] combOp:[(10, 4), (11, 2)] [21, 6] (21, 6)
可見,藉助於Spark,我們可以輕鬆地改變並行運算的數目。
本來這個例子到此就結束了,但還可以延伸一下,看下將部署模式改為cluster,以及提交到叢集會有什麼區別。
local[2]/cluster
當部署模式改為cluster時,配置如下:
conf = SparkConf().setAppName("test_aggregate").setMaster("local[2]")\ .set("spark.submit.deployMode", "cluster")
運算結果如下:
seqOp:[seqOp: ( 0, 0), 4][( [4, 1] seqOp:0[(4, , 01)), , 15]] [9 , [21], 1]seqOp: [(9, seqOp:2)[, (16, ]1 )[, 215], 3 ][ 3, 2] seqOp:[(3, 2), 3] [6, 3] comOp:[(0, 0), (6, 3)] [6, 3] comOp:[(6, 3), (15, 3)] [21, 6] (21, 6)
可以看到上面輸出的次序變亂,因為seqOp函式並行執行,同時在控制檯進行輸出。在實際應用中,由於local叢集模式幾乎只用於開發和測試,因此,部署模式選用client就好了。
standalone/client
當叢集模式為standalone時,配置如下:
conf = SparkConf().setAppName("test_aggregate").setMaster("spark://hadoop01:7077")\ .set("spark.submit.deployMode", "client")
當前,對於Python應用,Standalone叢集模式不支援Cluster部署。
此時,再次執行,因為seqOp是在不同的伺服器上執行的,當執行print函式時,是輸出到當前執行這段程式碼的主機上。因此,執行在Driver Programe的機器上,即執行spark-submit提交python程式碼的機器上,沒有任何的顯示。輸出如下所示:
comOp:[(0, 0), (6, 3)] [6, 3] comOp:[(6, 3), (15, 3)] [21, 6] (21, 6)
至此,我們已經完成了這個例子。可以看到,Spark是一個分散式的運算引擎,通過它,我們可以將自各種資料來源的資料,在不同的機器上進行並行運算,並獲得最終結果。總體上,是一種分而治之的思想,和MapReduce的解決思路是類似地。只不過,Spark的執行效能要高出MapReduce很多(官方參考10~100倍)。
感謝閱讀,希望這篇文章能給你帶來幫助!