MapReduce 論文和實驗筆記
總結下 6.824 MapReduce lab 的論文筆記和實驗過程。本文程式碼: MIT6.824/mapreduce
前言
自己的 nsx PRC 框架 v0.2 需支援分散式環境下服務變更的通知,對 zookeeper 不想只停留在會用的層面,於是學習 MIT 6.824 Distributed Systems ,本文是 Lec1: MapReduce 的學習筆記。
論文閱讀
問題來源
在 2004 年以前,Google 團隊為處理各種原始資料實現了上百個專用計算程式,比如對原始網頁文件生成倒排索引,資料量少時單機處理就行,但資料量過大後單機處理就太耗時了,只能將資料分佈在多個主機上並行處理,最後聚合各節點生成的索引資料。
分散式計算降低了耗時,但也必須解決一些問題:如何分發資料?多節點如何保證負載均衡?如何處理節點失效?… 多節點排程工作並不簡單。類似的大資料處理場景在谷歌內部還有很多。於是 Jeff 團隊就將類似場景的處理流程抽象出來,在 2004 年推出了分散式計算模型 MapReduce,使用者只需自定義的 2 個數據的處理函式:
- 如何分割原始資料:Map Func
- 如何聚合中間資料:Reduce Func
之後就能使用 MR 模型通過加節點來提高計算效率,關於節點容錯、資料分發、負載均衡的問題 MR 都已處理。
MR 應用例項
舉個例子:對文字檔案中的單詞計數,論文中 MR 內部處理的虛擬碼如下:
// MR 處理的資料是 Key-Value 結構的 // key 是檔名,value 是整個檔案內容,對空格隔開的每個單詞進行計數 "1" 操作 map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); // 對每個單詞 key 都進行詞頻累加 reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
MR 內部隱藏了 Map 操作後將計數結果寫入中間檔案,Reduce 操作從中間檔案讀取計數資訊的細節。只需使用者自己實現 Map/Reduce 的邏輯,即可將任務分散式並行化執行來大幅提升效率。
MR 資料結構
MR 面向的輸入輸出資料都是 Key-Value 結構,其中 k-v 約定都是 string 型別,值可能是整個原始檔案的內容,也可能是數字,取決於使用者自定義的 map func 和 reduce func,這 2 個函式的關聯型別是固定的:
map(k1,v1)→ list(k2,v2) reduce (k2,list(v2)) → list(v2)
- map 處理 raw data 對每個內容點 k 都生成 k-v pair
- reduce 對每個 k 都聚合其 list 中間資料,最終生成聚合結果
MR 執行流程
首先說明 MapReduce 是一種分散式計算模型,不是某個開源的分散式排程框架,所以在不同場景下對模型的實現程式碼並不相同。比如對文字檔案中的單詞進行計數,可使用 MR 模型來實現分散式執行,系統執行流程如下:
- Split: MR 將 input files 分割為 M 個子資料片段
- Fork:將使用者程式 fork 後執行在多個節點上,整個執行過程會執行 M 個 map task 和 R 個 reduce task,節點由一個 master 和多個 worker 組成,其中 master 負責排程空閒的 worker 來執行 task
- Map:
- 被分配到 map task 的 worker 先讀取子資料片段,再呼叫 Map func 來處理原始資料生成 k-v pairs 中間資料,並通過分割槽函式歸類到 R 個子檔案,隨後寫入本地磁碟。
- map worker 將中間檔案的儲存地址通知 master,隨後 master 將 R 箇中間檔案分配給 reduce worker 處理
- Reduce:
- 被分配到 reduce task 的 worker 使用 RPC 讀取 map worker 上 master 給定的中間檔案。雖然同一個 key 會被分割槽到同一個中間檔案,但 key 與 key 之間的寫入順序是無序的,所以讀取完畢後需對 keys 統一進行排序,否則輸出到 output file 的結果是無序的,會導致 master merge 的結果也是無序的。
- 排序完畢後對每個 key 都呼叫 Reduce func 來進行聚合,並將結果輸出到對應分割槽的 output file 中
- Merge:master 等所有的 map task 和 reduce task 都執行完畢後,將 R 個 output files 進行 Merge 操作,整個分散式計算過程執行結束。
處理容錯
worker 失效
master 會定期向各個 worker 傳送 ping 心跳包,若在超時時間內收到 pong 包則認為 worker 有效,否則標記為失效不可用。MR 會將原來分配到失效 worker 的 task 回收重新分配到其他可用的 worker 上重新執行。值得區分的是:
- map worker 失效後是必須重新執行 map task,因為 worker 崩潰了無法處理本地中間檔案的訪問請求
- reduce worker 如果失效但已生成聚合檔案,通知給了 master 該檔案在 GFS 中的位置,就不必重新執行
相比論文中如上第 2 種 worker 容錯機制,實際在 lab 中都是出錯超時直接將 task 分配給其他 worker 執行,因為 lab 並沒有實現 reduce worker 輸出結果到 output file 後通知 master 的機制。
master 失效
這種情形論文中只給出了簡單的處理方案,即定期將 master 的所有狀態作為快照 checkpoint 持久化到磁碟,當 master 崩潰後從最近的 checkpoint 啟動新的 master 繼續處理。
因為 MR 要求 map func/reduce func 都必須是功能函式,不保留任何狀態,即相同的輸入能得到相同的輸出。所以 master 恢復後繼續排程執行是可行的。
GFS
論文中的容錯機制充分利用了 GFS 分散式檔案系統的檔案原子特性,可直接看原論文是怎麼用的。
MR 實用技巧
分割槽函式
在 Map 階段,使用 hash(key) mod R
來保證每個 key 都能彙總到同一中間檔案,保證所有 key 儘可能地均勻分佈在 R 箇中間檔案中。
保證順序
在 Reduce 階段從中間檔案中讀取資料時得先排序再聚合,這樣聚合到 output files 之間就是分段有序的。
實驗筆記
Part1. 處理 MR 的輸入輸出
注意 map task 的輸出要能被 reduce task 讀取,所以要約定好 encode/decode 結構。lab 註釋建議每行儲存一個 JSON Encode 後的 k-v,自己做的時候可以 []k-v
直接 Marshal,在 reduce 讀取時對應反序列化。
Part2. 單機版 word count
對照如上 MR 例項流程圖實現。
Part3. 分散式版 MR
lab 中使用 RPC 在本地模擬分散式多節點的情況,有新 worker 註冊後會通知 registerChan,所以在 schedule 排程時候可 select 從 channel 接收新 worker,或者複用舊的空閒 worker 處理 task
Part4. 實現 worker 容錯
lab 沒有完全按照 paper 來,map/reduce worker 崩潰了都是直接分配給其他可用的空閒 worker 進行 re-execute,需注意多個 schedule goroutine 之間等待可用 worker 時可能出現競態條件,自己嘗試了幾個方案後總結了一些經驗:
-
不要通過共享記憶體來進行通訊,而是通過通訊來共享記憶體
lab 程式碼已有的 registerChan 是無緩衝 channel,如果複用它來在多個 schedule 間共享空閒 worker,那 map 任務結束後再向它傳送 worker 會直接阻塞,此時使用緩衝 channel 合適。反之如果將 worker 的狀態變更放到記憶體中共享使用,多個 schedule goroutine 共享和更新 worker,可能產生很多競態條件。
-
鎖使用粒度要小,要集中,不要寫多個 goroutine 可能會產生競態的程式碼
如果跑測試有時候通過,有時候在 lock 周圍 panic,那可能程式碼中還隱藏有競態條件,而且不好復現除錯。總之不要濫用 channel 和 sync.Mutex,梳理好多個 goroutine 之間資料傳遞方式後再寫程式碼也不遲。
Part5. 使用 MR 生成倒排索引
注意給每個單詞打分,將分數高的單詞排在前邊即可通過測試。
總結
MR 要求使用者先把任務拆分成 Map / Reduce 2 種子任務,MR 併發地執行 Map 任務產生中間資料,再併發地執行 Reduce 任務聚合資料,最終 Merge 後輸出結果,在處理海量資料時通過直接加 worker 就能提高系統性能,水平擴充套件能力很高。
MR 是一種開創性的分散式計算模型,能通過拆分邏輯實現任務的分散式執行,比較通用化。現如今,雖然有的分散式場景下 MR 模型不是最佳解決方案,但對於設計和學習分散式系統依然很有價值。
感謝 Jeffrey 團隊