使用 Flink 進行高吞吐,低延遲和 Exactly-Once 語義流處理
在本文中,我們將深入探討Flink新穎的檢查點機制是如何工作的,以及它是如何取代舊架構以實現流容錯和恢復。我們在各種型別的流處理應用程式上對Flink效能進行測試,並通過在Apache Storm(一種廣泛使用的低延遲流處理器)上執行相同的實驗來進行對比。
1. 流式架構的演變
在流處理中保證高效能同時又要保證容錯是比較困難的。在批處理中,當作業失敗時,可以容易地重新執行作業的失敗部分來重新計算丟失的結果。這在批處理中是可行的,因為檔案可以從頭到尾重放。但是在流處理中卻不能這樣處理。資料流是無窮無盡的,沒有開始點和結束點。帶有緩衝的資料流可以進行重放一小段資料,但從最開始重放資料流是不切實際的(流處理作業可能已經運行了數月)。此外,與僅具有輸入和輸出的批處理作業相比,流計算是有狀態的。這意味著除了輸出之外,系統還需要備份和恢復運算元狀態。由於這個問題比較複雜,因此在開源生態系統中有許多容錯方法去嘗試解決這個問題。
用於容錯機制對整個框架的架構有比較深的影響。很難將不同的容錯機制進行外掛化來整合到現有框架中。因此,在我們選擇一個流處理框架時,容錯機制也非常重要。
下面我們去了解一下流處理架構的幾種容錯方法,從 記錄確認
到 微批處理
, 事務更新
和 分散式快照
。我們將從以下幾個維度討論不同方法的優缺點,最終選出融合不同方法優點適合流處理程式的融合方法:
-
Exactly-once語義保證:故障後有狀態運算元的狀態能正確恢復。
-
低延遲:延遲越低越好。許多應用程式需要亞秒級延遲。
-
高吞吐量:隨著資料速率的增長,通過管道推送大量資料至關重要。
-
強大的計算模型:框架應該提供一種程式設計模型,該模型不會對使用者進行限制並保證應用程式在沒有故障的情況下容錯機制的低開銷。
-
流量控制:處理速度慢的運算元產生的背壓應該由系統和資料來源自然吸收,以避免因消費緩慢而導致崩潰或效能降低。
上面我們忽略了一個共同特徵,即失敗後的快速恢復,不是因為它不重要,而是因為(1)所有介紹的系統都能夠基於完全並行進行恢復,以及(2)在有狀態的應用程式中,狀態恢復的瓶頸通常在於儲存而不是計算框架。
2. 記錄確認機制(Apache Storm)
雖然流處理已經在金融等行業中廣泛使用多年,但最近流處理才成為大資料基礎設施的一部分。開源框架的可用性一直在推動著流處理的發展。開源中第一個廣泛使用的大規模流處理框架可能是Apache Storm。Storm使用上游備份和記錄確認機制來保證在失敗後重新處理訊息。請注意,Storm不保證狀態一致性,任何可變狀態的處理都需要委託給使用者處理(Storm的Trident API可以確保狀態一致性,將在下一節中介紹)。
記錄確認機制的工作方式如下:運算元(Operator)處理的每條記錄都會向前一個運算元發回一個已經處理過的確認。拓撲的 Source 節點會保留它產生的所有元組的一個備份。直到 Source 中記錄收到其所產生的到Sink的所有派生記錄的確認之後,就可以刪除上游備份的備份。當發生故障時,如果沒有收到所有的確認,Source 記錄就會重新發送。這種機制可以保證不會丟失資料,但很有可能導致重複處理記錄(我們稱之為At-Least-Once語義)。Storm 使用一種巧妙的機制來實現這種容錯方式,每個資料來源記錄只需要幾個位元組的儲存空間就可以跟蹤確認。Twitter Heron 保持與 Storm 相同的確認機制,但提高了記錄重放的效率(從而提高了恢復時間和整體吞吐量)。
純記錄確認體系結構,無論其效能如何,都無法提供Exactly-once語義保證,這給應用程式開發人員帶來了刪除重複資料的負擔。對於某些應用程式而言,這可能是可以接受的,但對於其他應用程可能並不能接受。Storm的機制的其他問題還有吞吐量低和流量控制的問題,在出現背壓的情況下,記錄確認機制會導致上游節點錯誤地認為資料處理出現了故障(實際上僅僅是由於出現背壓導致記錄來不及處理,而無法傳送確認)。這導致了基於微批處理的流式架構的發展。
3. 微批處理(Apache Storm Trident, Apache Spark Streaming)
Storm和先前的流處理系統不能滿足一些對大規模應用程式至關重要的需求,特別是高吞吐量,快速並行恢復以及託管狀態的Exactly-once語義。
容錯流式架構的下一個發展階段是微批處理或離散化流。這個想法非常簡單:為了解決連續計算模型(處理和緩衝記錄)所帶來的記錄級別同步的複雜性和開銷,連續計算分解為一系列小的原子性的批處理作業(稱為微批次)。每個微批次可能會成功或失敗,如果發生故障,重新計算最近的微批次即可。
微批處理可以應用到現有引擎(有能力進行資料流計算)之上。例如,可以在批處理引擎(例如,Spark)之上應用微批處理以提供流功能(這是Spark Streaming背後的基本機制),也可以應用於流引擎之上(例如,Storm)提供 Exactly-once 語義保證和狀態恢復(這是Storm Trident背後的基本機制)。在 Spark Streaming 中,每個微批次計算都是一個 Spark 作業,而在 Trident 中,每個微批次中的所有記錄都會被合併為一個大型記錄。
基於微批處理的系統可以實現上面列出的多個的要求(Exactly-once語義保證,高吞吐量),但也有不足之處:
-
程式設計模型:例如,Spark Streaming 為了實現上述目標將程式設計模型從流式更改為微批處理。這意味著使用者不能再以任意時間而只能在檢查點間隔的倍數上視窗化資料,並且模型不支援許多應用程式所需的基於計數或會話的視窗。這些都是應用程式開發人員關注的問題。具有可以改變狀態的持續計算的純流模型為使用者提供了更大的靈活性。
-
流量控制:使用基於時間劃分批次的微批次架構仍然具有背壓的問題。如果微批處理在下游操作中(例如,由於計算密集型運算元處理不過來或向外部儲存資料比較緩慢)比在劃分批次的運算元(通常是源)中花費更長時間,則微批次將花費比配置更長的時間(譯者注:下游運算元處理速度跟不上劃分批次運算元的速度)。這導致越來越多的批次排隊,或者導致微批量增加。
-
延遲:微批處理顯然將作業的延遲限制為微批處理的延遲。雖然亞秒級的批處理延遲對於簡單應用程式是可以接受的,但是具有多個網路Shuffle的應用程式很容易將延遲時間延長到數秒。
微批處理模型的最大侷限可能是它連線了兩個不應連線的概念:應用程式定義的視窗大小和系統內部恢復間隔。假設一個程式(下面示例是Flink程式碼)每5秒聚合一次記錄:
dataStream .map(transformRecords) .groupBy("sessionId") .window(Time.of(5, TimeUnit.SECONDS)) .sum("price")
這些應用非常適合微批處理模型。系統累積5秒的資料,對它們求和,並在流上進行一些轉換後進行聚合計算。下游應用程式可以直接消費上述5秒聚合後的結果,例如在儀表板上顯示。但是,現在假設背壓開始起作用(例如,由於計算密集型的 transformRecords 函式),或者 devops 團隊決定通過將時間間隔增加到10秒來控制作業的吞吐量。然後,微批次大小變的不可控制(在出現背壓情況下),或者直接變為10秒(第二種情況)。這意味著下游應用程式(例如,包含最近5秒統計的 Web 儀表板)讀取的聚合結果是錯誤的,下游應用程式需要自己處理此問題。
微批處理可以實現高吞吐量和Exactly-Once語義保證,但是當前的實現是以拋棄低延遲,流量控制和純流式程式設計模型為代價實現上述目標的。顯而易見的問題是,是否有兩全其美的辦法:保持連續計算模型的所有優勢,同時還能保證Exactly-Once語義並提供高吞吐量。後面討論的後流式架構實現了這種組合,並將微批處理作為流式處理的基本模型。
通常,微批處理被認為是一次處理一條記錄的替代方法。這是一種錯誤的認識:連續運算元不需要一次只處理一條記錄。實際上,所有精心設計的流處理系統(包括下面討論的Flink和Google Dataflow)在通過網路傳輸之前都會緩衝許多記錄,同時又具備連續的處理能力。
4. 事務更新(Google Cloud Dataflow)
在保留連續運算元模型(低延遲,背壓容錯,可變狀態等)的優勢的同時又保證Exactly-Once處理語義的一種強大而又優雅的方法是原子性地記錄需要處理的資料並更新到狀態中。失敗後,可以從日誌中重新恢復狀態以及需要處理的記錄。
例如,在Google Cloud Dataflow中實現了此概念。系統將計算抽象為一次部署並長期執行的連續運算元的DAG。在Dataflow中,shuffle是流式傳輸的,中間結果不需要物化(譯者注:資料的計算結果放在記憶體中)。這為低延遲提供了一種自然的流量控制機制,因為中間過程的緩衝可以緩解背壓,直到反壓到資料來源(基於Pull模式的資料來源,例如Kafka消費者可以處理這個問題)。該模型還提供了一個優雅的流程式設計模型,可以提供更豐富的視窗而不是簡單的基於時間的視窗以及可以更新到長期可變的狀態中。值得注意的是,流程式設計模型包含微批處理模型。
例如,下面Google Cloud Dataflow程式(請參閱此處)會建立一個會話視窗,如果某個key的事件沒有在10分鐘內到達,則會觸發該會話視窗。在10分鐘後到達的資料將會啟動一個新視窗。
PCollection<String> items = ...; PCollection<String> session_windowed_items = items.apply( Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))))
這在流處理模型中很容易實現,但在微批處理模型中卻很難實現,因為視窗不對應固定的微批量大小。
這種架構的容錯工作原理如下。通過運算元的每個中間記錄與更新的狀態以及後續產生的記錄一起建立一個提交記錄,該記錄以原子性的方式追加到事務日誌或插入到資料庫中。在失敗的情況下,重放部分資料庫日誌來恢復計算狀態,以及重放丟失的記錄。
Apache Samza遵循類似的方法,但只能提供At-Least-Once語義保證,因為它使用Apache Kafka作為後臺儲存。Kafka(現在)不提供事務編寫器,因此對狀態和後續產生的流記錄的更新不能作為原子事務一起提交。
事務更新體系結構具有許多優點。事實上,它實現了我們在本文開頭提出的所有需求。該體系結構的基礎是能夠頻繁地寫入具有高吞吐量的分散式容錯儲存系統中。分散式快照(在下一節中進行了解釋)將拓撲的狀態作為一個整體進行快照,從而減少了對分散式儲存的寫入量和頻率。
5. 分散式快照(Apache Flink)
提供 Exactly-Once 語義保證的問題實際上可以歸結為確定當前流式計算所處的狀態(包括正在處理中記錄以及運算元狀態),然後生成該狀態的一致性快照,並將快照儲存在持久儲存中。如果可以經常執行上述操作,那麼從故障中恢復意味著僅從持久儲存中恢復最新快照,並將流資料來源(例如,Apache Kafka)回退到生成快照的時間點再次’重放’。Flink的分散式快照演算法可以參閱本文; 在下文中,我們會給出一個簡短的總結。
Flink的分散式快照演算法基於Chandy和Lamport在1985年設計的一種演算法,用於生成分散式系統當前狀態的一致性快照(詳細介紹請參閱此處),不會丟失資訊且不會記錄重複項。Flink使用的是Chandy Lamport演算法的一個變種,定期生成正在執行的流拓撲的狀態快照,並將這些快照儲存到持久儲存中(例如,儲存到HDFS或記憶體中檔案系統)。檢查點的儲存頻率是可配置的。
這有點類似於微批處理方法,兩個檢查點之間的所有計算都作為一個原子整體,要麼全部成功,要麼全部失敗。然而,只有這一點的相似之處。Chandy Lamport演算法的一個重要特點是我們不必在流處理中按下’暫停’按鈕(譯者注:等待檢查點完成之後)來排程下一個微批次。相反,常規資料處理一直執行,資料到達就會處理,而檢查點發生在後臺。引用原始論文:
全域性狀態檢測演算法會被設計在基礎計算上:它們必須同時執行,但不能改變基礎計算。
因此,這種架構融合了連續運算元模型(低延遲,流量控制和真正的流程式設計模型),高吞吐量,Chandy-Lamport演算法提供的的Exactly-Once語義保證的優點。除了備份有狀態計算的狀態(其他容錯機制也需要這樣做)之外,這種容錯機制幾乎沒有其他開銷。對於小狀態(例如,計數或其他統計),備份開銷通常可以忽略不計,而對於大狀態,檢查點間隔會在吞吐量和恢復時間之間進行權衡。
最重要的是,該架構將應用程式開發與流量控制和吞吐量控制分開。更改快照間隔對流作業的結果完全沒有影響,因此下游應用程式可以放心地依賴於接收到的正確結果。
Flink的檢查點機制基於流經運算元和渠道的 ‘barrier’(認為是Chandy Lamport演算法中的一種’標記’)來實現。Flink的檢查點的描述改編自Flink文件。
‘Barrier’ 在 Source 節點中被注入到普通流資料中(例如,如果使用Apache Kafka作為源,’barrier’ 與偏移量對齊),並且作為資料流的一部分與資料流一起流過DAG。’barrier’ 將記錄分為兩組:當前快照的一部分(’barrier’ 表示檢查點的開始),以及屬於下一個快照的那些組。
‘Barrier’ 流向下游並在通過運算元時觸發狀態快照。運算元首先將所有流入的流分割槽的 ‘barrier’ 對齊(如果運算元具有多個輸入),並會快取較快的分割槽資料(上游來源較快的流分割槽將被緩衝資料以等待來源較慢的流分割槽)。當運算元從每個輸入流中都收到 ‘barrier’ 時,會檢查其狀態(如果有)並寫到持久儲存中。一旦完成狀態寫檢查,運算元就將 ‘barrier’ 向下遊轉發。請注意,在此機制中,如果運算元支援,則狀態寫檢查既可以是非同步(在寫入狀態時繼續處理),也可以是增量(僅寫入更改)。
一旦所有資料接收器(Sink)都收到 ‘barrier’,當前檢查點就完成了。故障恢復意味著只需恢復最新的檢查點狀態,並從最新記錄的 ‘barrier’ 對應的偏移量重放資料來源。分散式快照在我們在本文開頭所要達到的所有需求中得分很高。它們實現了高吞吐量的Exactly-Once語義保證,同時還保留了連續運算元模型以及低延遲和自然流量控制。
6. 結論
下表總結了我們討論的每個體系結構如何支援這些功能。
記錄確認機制 | 微批次 | 事務更新 | 分散式快照 | |
---|---|---|---|---|
語義保證 | At Least Once | Exactly Once | Exactly One | Exactly One |
延遲 | 非常低 | 高 | 低(事務延遲) | 非常低 |
吞吐量 | 低 | 高 | 中到高(取決於分散式事務儲存的吞吐量) | 高 |
計算模型 | 流式 | 微批次 | 流式 | 流式 |
容錯開銷 | 高 | 低 | 取決於分散式事務儲存的吞吐量 | 低 |
流控制 | 有問題 | 有問題 | 自然 | 自然 |
應用程式邏輯與容錯分離 | 部分(超時很重要) | 否(微批量大小會影響語義) | 是 | 是 |
7. 實驗
為了說明Apache Flink的效能,我們進行了一系列實驗來研究吞吐量,延遲以及容錯機制的影響。下面所有實驗都是在Google Compute Engine上進行,使用30個例項,每個例項包含4個核心和15 GB記憶體。所有Flink實驗均使用截至7月24日的最新程式碼修訂版進行,所有Storm實驗均使用0.9.3版。可以在此處找到用於評估的所有程式碼。
7.1 吞吐量
我們在有30節點120個核的叢集上測量Flink和Storm在兩個不同程式上的吞吐量。第一個程式是並行流式grep任務,它在流中搜索包含與正則表示式匹配的字串的事件。
Flink實現了每個核每秒150萬個元素的連續吞吐量。這樣叢集的總吞吐量達到每秒1.82億個元素。測試得到的Flink延遲為零,因為作業不涉及網路,也不涉及微批處理。當開啟Flink容錯機制,設定每5秒進行一次Checkpoint,我們只看到吞吐量的輕微下降(小於2%),沒有引入任何延遲。
Storm叢集在關閉記錄確認機制的情況下(因此沒有任何準確性保證)實現了每核每秒約82,000個元素的吞吐量,99%的處理延遲在10毫秒左右。叢集的總吞吐量為每秒57萬個元素。當啟用記錄確認機制(保證At-Least-Once語義)時,Storm的吞吐量降至每核每秒4700個元素,延遲也增加到30-120毫秒。接下來,我們配置了Storm Trident,其微批量大小為200,000個元組。Trident實現了每個核每秒75,000個元素的吞吐量(總吞吐量與關閉容錯機制的Storm的大致相同)。然而,這是以3000毫秒的延遲(99%)為代價的。
我們可以看到Flink的吞吐量比Trident高出20倍以上,吞吐量比Storm高300倍。在保持高吞吐的情況下,Flink還保證延遲為零。我們還看到,不使用微批次處理模型,高吞吐量不會以延遲為代價。Flink還連結資料來源和接收器任務形成任務鏈,從而僅在單個JVM內交換記錄控制代碼。
我們還進行了如下實驗,將核從40個擴充套件到120個。跟我們預期一樣,所有框架都線性擴充套件,因為grep是一個易於並行處理的程式。現在讓我們看一個不同的實驗,它按鍵進行流分組,從而通過網路對流進行Shuffle。我們在30臺機器的叢集中執行此作業,其系統配置與以前相同。Flink實現了每核每秒大約720,000個事件的吞吐量,啟動檢查點後降至690,000。請注意,Flink在每個檢查點都要備份運算元的狀態,而Storm則不支援。此示例中的狀態相對較小(計數和摘要,每個檢查點每個運算元的大小小於1M)。具有At-Least-Once語義保證的Storm具有每核每秒約2,600個事件的吞吐量。
7.2 延遲
能夠處理大規模事件是至關重要的。另一方面,在流處理中尤為重要的是延遲。對於欺詐檢測或IT安全等應用程式,以毫秒為單位對事件進行處理意味著可以防止問題出現,而超過100毫秒的延遲通常意味著問題只能在問題發生之後才能發現,而這時候發現意義已經不大了。
當應用程式開發人員可以允許一定的延遲時,通常需要把延遲限制在一定範圍內。我們測量流記錄分組作業的幾個延遲界限,該作業通過網路對資料進行Shuffle。下圖顯示了觀察到的中位數延遲,以及第90百分位,第95百分位和第99百分位延遲(例如,50毫秒的第99百分位的延遲意味著99%的元素到達管道的末端不到50毫秒)。
在以最大吞吐量執行時,Flink的中位數延遲為26毫秒,第99百分位延遲為51毫秒,這意味著99%的延遲都低於51毫秒。開啟Flink的檢查點機制(啟用Exact-Once語義保證)並沒有增加可觀察到的延遲。但此時,我們確實看到較高百分位數的延遲增加,觀察到的延遲大約為150毫秒(譯者注:沒太搞懂)。出現延遲增加的原因是需要對齊流,運算元等待接收所有輸入的 ‘barrier’。Storm具有非常低的中位數延遲(1毫秒),並且第99百分位的延遲也是51毫秒。
對於大多數應用程式而言,讓人感興趣的是能夠在可接受的延遲上維持高吞吐量,具體取決於特定應用程式的延遲要求。在Flink中,使用者可以使用緩衝區超時時間(Buffer Timeout)來調整可接受的延遲。這是什麼意思?Flink運算元在將記錄傳送到下一個運算元之前會暫儲存在緩衝區中。通過指定緩衝區超時時間,例如10毫秒,我們可以告訴Flink在緩衝區滿了時或者到達10毫秒時傳送緩衝區資料。較低的緩衝區超時時間通常意味著較低的延遲,可能以吞吐量為代價。在上面的實驗中,緩衝區超時時間設定為50毫秒,這解釋了為什麼99%的記錄延遲在50毫秒以下。
下面說明了延遲如何影響Flink的吞吐量。因為較低的延遲保證意味著緩衝較少的資料,所以必然會產生一定的吞吐量成本。下圖顯示了不同緩衝區超時時間下的Flink吞吐量。該實驗再次使用流記錄分組作業。
如果指定緩衝區超時時間為零,流經運算元的記錄不會緩衝而是立即轉發到下一個運算元。在這個延遲優化設定中,Flink可以實現50%的元素延遲在0毫秒,以及99%的元素延遲在20毫秒以下。相應的吞吐量為每個核每秒24,500個事件。當我們增加緩衝區超時時間時,我們會看到延遲增加,吞吐量會同時增加,直到達到吞吐量峰值,緩衝區填充速度超過超時到期時間。緩衝區超時時間為50毫秒時,系統達到每個核每秒750,000個事件的吞吐量峰值,99%的處理延遲在50毫秒以下。
7.3 正確性與恢復開銷
我們最後一個實驗評估了檢查點機制的正確性和恢復的開銷。我們執行一個需要強一致性的流式程式,並定期殺死工作節點。
我們的測試程式受到網路安全/入侵檢測等用例的啟發,並使用規則來檢查事件序列的有效性(例如,身份驗證令牌,登入,服務互動)。該程式從Kafka並行讀取事件流,並通過生成的實體(例如,IP地址或使用者ID)對事件進行分組。對於每個事件,程式根據一些規則檢測目前為止生成實體對應事件序列是否有效(例如,’服務互動’ 必須在 ‘登入’ 之前)。對於無效序列,程式會發布警報。如果沒有Exactly-Once語義保證,發生故障時將不可避免地產生無效的事件序列並導致程式釋出錯誤警報。
我們在一個30節點的叢集中執行這個程式,其中 YARN chaos monkey
程序每5分鐘殺死一個隨機的YARN容器。我們保留備用 Worker(TaskManagers),這樣系統可以在發生故障後立即取的新資源並繼續執行,而無需等待YARN配置新容器。Flink將重新啟動失敗的 Worker 並在後臺將其加入到叢集,以確保備用Worker始終可用。
為了模擬的效果,我們使用並行資料生成器將事件推送到Kafka,這些生成器每個核的速度大約為每秒30,000個事件。下圖顯示了資料生成器的速率(紅線),以及Flink作業從Kafka讀取事件並使用規則驗證事件序列的吞吐量(藍線)。