Spark 運算元講解(action 篇)
spark運算元分為兩大種,一種是 transformation運算元,另一種是 action運算元。
transformation又叫轉換運算元,它從一個 RDD到另一個 RDD是延遲執行的,不會馬上觸發作業的提交,只有在後續遇到某個 action運算元時才執行;
action運算元會觸發 SparkContext提交 Job,並將資料輸出 spark系統。今天舉例講解一下 action運算元。
1) count
就是統計 RDD 中元素個數的運算元。
舉個栗子:
val rdd =sc.parallelize(
List( "hello" , "world!" , "hi" , "beijing" ))
println (rdd.count())
輸出:
4
2) co llect
把 RDD 中的元素提取到 driver 記憶體中,返回陣列形式。
舉個栗子:
val rdd = sc .parallelize(
List ( "hello" , "world!" , "hi" , "beijing" ) , 2 )
val arr : Array[ String ] = rdd.collect()
println ( arr )
arr .foreach( println )
輸出:
[Ljava.lang.String;@760e8cc0
hello
world!
hi
beijing
3) foreach
遍歷 RDD 中的每一個元素,無返回值。此運算元用法參考上下文。
4) saveAsTextFile
把 RDD 中的資料以文字的形式儲存
舉個栗子:
val rdd =sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
rdd.saveAsTextFile(
"/home/myname/test" )
5) saveAsSequenceFile
是個 k-v 運算元,把 RDD 中的資料以序列化的形式儲存。使用此運算元的前提是 RDD 中元素是鍵值對格式。
舉個栗子:
val rdd = sc .parallelize(
List (( "a" , 1 ) , ( "b" , 2 ) , ( "c" , 3 ) , ( "c" , 4 )) , 2 )
rdd.saveAsSequenceFile(
"/home/myname/test" )
6) countByKey
是個 k-v 運算元,按 key 統計各 key 的次數,返回 Map
舉個栗子:
val rdd =sc.parallelize(
List(( "a" , 1 ) , ( "b" , 2 ) , ( "c" , 3 ) , ( "c" , 4 )) , 2 )
val res: Map[ String , Long ] = rdd
.countByKey()
res.foreach( println )
輸出:
(b,1)
(a,1)
(c,2)
7) collectAsMap
把 RDD 中元素以 Map 形式提取到 driver 端。需要注意的是如果存在多個相同 key ,後面出現的會覆蓋前面的。
舉個栗子:
val rdd = sc .parallelize(
List (( "a" , 1 ) , ( "b" , 2 ) , ( "c" , 3 ) , ( "c" , 4 )) , 2 )
val res: Map[ String , Int ] = rdd
.collectAsMap()
res.foreach( println )
輸出:
(b,2)
(a,1)
(c,4)
8) take
從 RDD 中取下標前 n 個元素,不排序。返回陣列。
舉個栗子:
val rdd =sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val take: Array[ Int ] = rdd
.take( 2 )
take.foreach( println )
輸出:
5
4
9) takeSample
從指定 RDD 中抽取樣本。第一個引數為 false 表示取過的元素不再取,為 true 表示取過的元素可以再次被抽樣;第二個引數表示取樣數量;第三個引數不好把握建議預設值
舉個栗子:
val rdd = sc .makeRDD(
Array ( "aaa" , "bbb" , "ccc" , "ddd" , "eee" ))
val sample: Array[ String ] = rdd
.takeSample( false , 2 )
sample.foreach( println )
輸出:
eee
bbb
10) first
返回 RDD 中第一個元素。
舉個栗子:
val rdd =sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val first: Int = rdd.first()
println (first)
輸出:
5
11) top
從 RDD 中按預設順序 ( 降序 ) 或指定順序取 n 個元素
舉個栗子:
val rdd =sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val take: Array[ Int ] = rdd.top( 2 )
take.foreach(println)
輸出:
9
7
12) takeOrdered
從 RDD 中取 n 個元素,與 top 運算元不同的是它是以和 top 相反的順序返回元素。
舉個栗子:
val rdd =sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val take: Array[ Int ] = rdd
.takeOrdered( 2 )
take.foreach( println )
輸出:
1
4
13) saveAsObjectFile
把 RDD 中元素序列化並儲存,底層依賴 saveAsSequenceFile
舉個栗子:
val rdd =sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
rdd.saveAsObjectFile(
"/home/myname/test" )
14)reduce
reduce 引數是一個函式,把 RDD 中的元素兩兩傳遞給此函式,然後進行計算產生一個結果,結果再與下一個值計算,如此迭代。
舉個栗子:
val rdd = sc .makeRDD(
List ( 1 , 2 , 3 , 4 , 5 ))
val result: Int = rdd
.reduce((x , y) => x + y)
println (result)
輸出:
15
15) lookup
是個 k-v 運算元,指定 key 值,返回此 key 對應的所有 v 值
舉個栗子:
val rdd1 =sc.makeRDD(
Array(( "A" , 0 ) , ( "A" , 2 ) ,
( "B" , 1 ) , ( "B" , 2 ) , ( "C" , 1 )))
val rdd2: Seq [ Int ] = rdd1
.lookup( "A" )
rdd2.foreach( println )
輸出:
0
2
16) aggregate
aggregate使用者聚合 RDD中的元素,先指定初始值,再對 RDD中元素進行區域性求和,最後全域性求和。此運算元理解起來不是特別直觀,大家感受一下。
舉個栗子:
val rdd =sc.parallelize(
List( 1 , 2 , 3 , 4 ))
val res: Int = rdd
.aggregate( 2 )(_+_ , _+_)
println (res)
輸出:
14
17) fold
fold是aggregate的簡化
舉個栗子:
val rdd = sc .parallelize(
List ( 1 , 2 , 3 , 4 ))
val res: Int = rdd
.fold( 2 )((x , y) => x + y)
println (res)
輸出:
14