Apache Spark 記憶體管理詳解(下)
導讀:本文是續接上一篇《 Apache Spark記憶體管理詳解(上) 》(未閱讀的同學可以點選檢視)的內容,主要介紹兩部分: 儲存記憶體管理 ,包含 RDD的持久化機制、RDD快取的過程、淘汰和落盤 ; 執行記憶體管理 ,包含 多工間記憶體分配、Shuffle的記憶體佔用 。
儲存記憶體管理
RDD的持久化機制
彈性分散式資料集(RDD)作為Spark最根本的資料抽象,是隻讀的分割槽記錄(Partition)的集合,只能基於在穩定物理儲存中的資料集上建立,或者在其他已有的RDD上執行轉換(Transformation)操作產生一個新的RDD。轉換後的RDD與原始的RDD之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark保證了每一個RDD都可以被重新恢復。但RDD的所有轉換都是惰性的,即只有當一個返回結果給Driver的行動(Action)發生時,Spark才會建立任務讀取RDD,然後真正觸發轉換的執行。
Task在啟動之初讀取一個分割槽時,會先判斷這個分割槽是否已經被持久化,如果沒有則需要檢查Checkpoint或按照血統重新計算。 所以如果一個RDD上要執行多次行動,可以在第一次行動中使用persist或cache方法,在記憶體或磁碟中持久化或快取這個RDD,從而在後面的行動時提升計算速度。 事實上,cache方法是使用預設的MEMORY_ONLY的儲存級別將RDD持久化到記憶體,故快取是一種特殊的持久化。 堆內和堆外儲存記憶體的設計,便可以對快取RDD時使用的記憶體做統一的規劃和管理 (儲存記憶體的其他應用場景,如快取broadcast資料,暫時不在本文的討論範圍之內)。
RDD的持久化由Spark的Storage模組負責,實現了RDD與物理儲存的解耦合。 Storage模組負責管理Spark在計算過程中產生的資料,將那些在記憶體或磁碟、在本地或遠端存取資料的功能封裝了起來。 在具體實現時Driver端和Executor端的Storage模組構成了主從式的架構,即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。 Storage模組在邏輯上以Block為基本儲存單位,RDD的每個Partition經過處理後唯一對應一個Block(BlockId的格式為 rdd_RDD-ID_PARTITION-ID )。 Master負責整個Spark應用程式的Block的元資料資訊的管理和維護,而Slave需要將Block的更新等狀態上報到Master,同時接收Master的命令,例如新增或刪除一個RDD。
圖1 Storage模組示意圖
在對RDD持久化時,Spark規定了MEMORY_ONLY、MEMORY_AND_DISK等7種不同的儲存級別,而儲存級別是以下5個變數的組合:
class StorageLevel private( private var _useDisk: Boolean, //磁碟 private var _useMemory: Boolean, //這裡其實是指堆內記憶體 private var _useOffHeap: Boolean, //堆外記憶體 private var _deserialized: Boolean, //是否為非序列化 private var _replication: Int = 1 //副本個數 )
通過對資料結構的分析,可以看出儲存級別從三個維度定義了RDD的Partition(同時也就是Block)的儲存方式:
-
儲存位置: 磁碟/堆內記憶體/堆外記憶體。 如MEMORY_AND_DISK是同時在磁碟和堆內記憶體上儲存,實現了冗餘備份。 OFF_HEAP則是隻在堆外記憶體儲存,目前選擇堆外記憶體時不能同時儲存到其他位置。
-
儲存形式: Block快取到儲存記憶體後,是否為非序列化的形式。 如MEMORY_ONLY是非序列化方式儲存,OFF_HEAP是序列化方式儲存。
-
副本數量: 大於1時需要遠端冗餘備份到其他節點。 如DISK_ONLY_2需要遠端備份1個副本。
RDD快取的過程
RDD在快取到儲存記憶體之前,Partition中的資料一般以迭代器( Iterator )的資料結構來訪問,這是Scala語言中一種遍歷資料集合的方法。 通過Iterator可以獲取分割槽中每一條序列化或者非序列化的資料項(Record),這些Record的物件例項在邏輯上佔用了JVM堆內記憶體的other部分的空間,同一Partition的不同Record的空間並不連續。
RDD在快取到儲存記憶體之後,Partition被轉換成Block,Record在堆內或堆外儲存記憶體中佔用一塊連續的空間。 將Partition由不連續的儲存空間轉換為連續儲存空間的過程,Spark稱之為“展開”(Unroll)。 Block有序列化和非序列化兩種儲存格式,具體以哪種方式取決於該RDD的儲存級別。 非序列化的Block以一種DeserializedMemoryEntry的資料結構定義,用一個數組儲存所有的Java物件例項,序列化的Block則以SerializedMemoryEntry的資料結構定義,用位元組緩衝區(ByteBuffer)來儲存二進位制資料。 每個Executor的Storage模組用一個鏈式Map結構(LinkedHashMap)來管理堆內和堆外儲存記憶體中所有的Block物件的例項,對這個LinkedHashMap新增和刪除間接記錄了記憶體的申請和釋放。
因為不能保證儲存空間可以一次容納Iterator中的所有資料,當前的計算任務在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時佔位,空間不足則Unroll失敗,空間足夠時可以繼續進行。 對於序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。 而非序列化的Partition則要在遍歷Record的過程中依次申請,即每讀取一條Record,取樣估算其所需的Unroll空間並進行申請,空間不足時可以中斷,釋放已佔用的Unroll空間。 如果最終Unroll成功,當前Partition所佔用的Unroll空間被轉換為正常的快取RDD的儲存空間,如下圖2所示。
圖2 Spark Unroll示意圖
在《Apache Spark 記憶體管理詳解(上)》(可以翻閱公眾號檢視)的圖3和圖5中可以看到,在靜態記憶體管理時,Spark在儲存記憶體中專門劃分了一塊Unroll空間,其大小是固定的,統一記憶體管理時則沒有對Unroll空間進行特別區分,當儲存空間不足時會根據動態佔用機制進行處理。
淘汰與落盤
由於同一個Executor的所有的計算任務共享有限的儲存記憶體空間,當有新的Block需要快取但是剩餘空間不足且無法動態佔用時,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其儲存級別中同時包含儲存到磁碟的要求,則要對其進行落盤(Drop),否則直接刪除該Block。
儲存記憶體的淘汰規則為:
-
被淘汰的舊Block要與新Block的MemoryMode相同,即同屬於堆外或堆內記憶體
-
新舊Block不能屬於同一個RDD,避免迴圈淘汰
-
舊Block所屬RDD不能處於被讀狀態,避免引發一致性問題
-
遍歷LinkedHashMap中Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新Block所需的空間。 其中LRU是LinkedHashMap的特性。
落盤的流程則比較簡單,如果其儲存級別符合 _useDisk 為true的條件,再根據其 _deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最後將資料儲存到磁碟,在Storage模組中更新其資訊。
執行記憶體管理
多工間記憶體分配
Executor內執行的任務同樣共享執行記憶體,Spark用一個HashMap結構儲存了任務到記憶體耗費的對映。 每個任務可佔用的執行記憶體大小的範圍為 1/2N ~ 1/N ,其中N為當前Executor內正在執行的任務的個數。 每個任務在啟動之時,要向MemoryManager請求申請最少為1/2N的執行記憶體,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行記憶體,該任務才可以被喚醒。
Shuffle的記憶體佔用
執行記憶體主要用來儲存任務在執行Shuffle時佔用的記憶體,Shuffle是按照一定規則對RDD資料重新分割槽的過程,我們來看Shuffle的Write和Read兩階段對執行記憶體的使用:
-
Shuffle Write
-
若在map端選擇普通的排序方式,會採用ExternalSorter進行外排,在記憶體中儲存資料時主要佔用堆內執行空間。
-
若在map端選擇Tungsten的排序方式,則採用ShuffleExternalSorter直接對以序列化形式儲存的資料排序,在記憶體中儲存資料時可以佔用堆外或堆內執行空間,取決於使用者是否開啟了堆外記憶體以及堆外執行記憶體是否足夠。
-
Shuffle Read
-
在對reduce端的資料進行聚合時,要將資料交給Aggregator處理,在記憶體中儲存資料時佔用堆內執行空間。
-
如果需要進行最終結果排序,則要將再次將資料交給ExternalSorter處理,佔用堆內執行空間。
在ExternalSorter和Aggregator中,Spark會使用一種叫AppendOnlyMap的雜湊表在堆內執行記憶體中儲存資料,但在Shuffle過程中所有資料並不能都儲存到該雜湊表中,當這個雜湊表佔用的記憶體會進行週期性地取樣估算,當其大到一定程度,無法再從MemoryManager申請到新的執行記憶體時,Spark就會將其全部內容儲存到磁碟檔案中,這個過程被稱為溢存(Spill),溢存到磁碟的檔案最後會被歸併(Merge)。
Shuffle Write階段中用到的Tungsten是Databricks公司提出的對Spark優化記憶體和CPU使用的計劃,解決了一些JVM在效能上的限制和弊端。 Spark會根據Shuffle的情況來自動選擇是否採用Tungsten排序。 Tungsten採用的頁式記憶體管理機制建立在MemoryManager之上,即Tungsten對執行記憶體的使用進行了一步的抽象,這樣在Shuffle過程中無需關心資料具體儲存在堆內還是堆外。 每個記憶體頁用一個MemoryBlock來定義,並用 Object obj 和 long offset 這兩個變數統一標識一個記憶體頁在系統記憶體中的地址。 堆內的MemoryBlock是以long型陣列的形式分配的記憶體,其 obj 的值為是這個陣列的物件引用, offset 是long型陣列的在JVM中的初始偏移地址,兩者配合使用可以定位這個陣列在堆內的絕對地址; 堆外的MemoryBlock是直接申請到的記憶體塊,其 obj 為null, offset 是這個記憶體塊在系統記憶體中的64位絕對地址。 Spark用MemoryBlock巧妙地將堆內和堆外記憶體頁統一抽象封裝,並用頁表(pageTable)管理每個Task申請到的記憶體頁。
Tungsten頁式管理下的所有記憶體用64位的邏輯地址表示,由頁號和頁內偏移量組成:
1. 頁號:佔13位,唯一標識一個記憶體頁,Spark在申請記憶體頁之前要先申請空閒頁號。 2. 頁內偏移量:佔51位,是在使用記憶體頁儲存資料時,資料在頁內的偏移地址。
有了統一的定址方式,Spark可以用64位邏輯地址的指標定位到堆內或堆外的記憶體,整個Shuffle Write排序的過程只需要對指標進行排序,並且無需反序列化,整個過程非常高效,對於記憶體訪問效率和CPU使用效率帶來了明顯的提升 。
小結
Spark的儲存記憶體和執行記憶體有著截然不同的管理方式: 對於儲存記憶體來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要快取的RDD的Partition轉化而成; 而對於執行記憶體,Spark用AppendOnlyMap來儲存Shuffle過程中的資料,在Tungsten排序中甚至抽象成為頁式記憶體管理,開闢了全新的JVM記憶體管理機制。
參考文獻
-
《Spark技術內幕:深入解析Spark核心架構與實現原理 》 —— 第八章 Storage模組詳解
-
Spark儲存級別的原始碼
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
-
Spark Sort Based Shuffle記憶體分析
https://www.jianshu.com/p/c83bb237caa8
-
Project Tungsten: Bringing Apache Spark Closer to Bare Metal
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
-
Spark Tungsten-sort Based Shuffle分析
https://www.jianshu.com/p/d328c96aebfd
-
探索Spark Tungsten的祕密
https://github.com/hustnn/TungstenSecret/tree/master
-
Spark Task記憶體管理(on-heap&off-heap)
https://www.jianshu.com/p/8f9ed2d58a26
原文連結 http://www.leonlu.cc/profession/18-spark-memory-management-part2
↓點選這裡閱讀原文