Spark Streaming + Kafka 的 offset 管理方法
點選上方藍字關注【 北郵郭大寶 】
最近實習需要開發一套Spark Streaming的實時流處理專案,內心還是很期待的。說來慚愧,做大資料開發實習一年有餘了,都是離線批處理的任務,還沒親自操刀部署上線一套流處理專案。正好有這樣的機會,補一補自己的知識短板。
Spark Streaming的基礎知識在之前在公眾號裡有過介紹,是實習小夥伴沙利民同學總結的,寫的很不錯。需要參考的同學可以點選學習。 ofollow,noindex">Spark Streaming從入門到實踐(上) Spark Streaming從入門到實踐(下)
本篇主要介紹一下Spark Streaming在消費Kafka過程中,當出現程式掛掉重啟後,找到上次消費過的最後一次資料,確保kafka資料精確消費一次(exactly-once)的目的。
1. 背景介紹
首先先說下kafka三種訊息傳遞保證:
-
at most once,訊息至多會被髮送一次,但如果產生網路延遲等原因訊息就會有丟失
-
at least once,訊息至少會被髮送一次,上面既然有訊息會丟失,那麼給它加一個訊息確認機制即可解決,但是訊息確認階段也還會出現同樣問題,這樣訊息就有可能被髮送兩次。
-
exactly once,訊息只會被髮送一次,這是我們想要的效果
對於資料的消費者,自然希望最後一種情況。kafka通過offset記錄每個topic中的每個partition的訊息的位置資訊,如果程式掛掉重啟的話,程式可以找到上次最後一次消費訊息的offset位置,從下一個開始繼續消費資料。如果沒有儲存每個分割槽已經讀取的offset,那麼Spark Streaming就沒有辦法從上次斷開(停止或者報錯導致)的位置繼續讀取訊息。
2. 常見offset管理方法介紹
常見的offset管理辦法隨著kafka的完善不斷改進的,offset可以通過多種方式管理,一般的步驟如下:
-
DStream初始化的時候,需要指定一個包含每個topic的每個分割槽的offset用於讓DStream從指定位置讀取資料
-
消費資料
-
更新offsets並儲存
2.1 checkpoints
Spark Streaming的checkpoints是最基本的儲存狀態資訊的方式,一般是儲存在HDFS中。但是最大的問題是如果streaming程序升級的話,checkpoints的資料無法使用,所以幾乎沒人使用。
2.2 Zookeeper
Spark Streaming任務在啟動時會去Zookeeper中讀取每個分割槽的offsets。如果有新的分割槽出現,那麼他的offset將會設定在最開始的位置。在每批資料處理完之後,使用者需要可以選擇儲存已處理資料的一個offset或者最後一個offset來儲存。這種辦法需要消費者頻繁的去與Zookeeper進行互動,如果期間 Zookeeper 叢集發生變化,那 Kafka 叢集的吞吐量也跟著受影響。
2.3 一些外部資料庫(HBase,Redis等)
可以藉助一些可靠的外部資料庫,比如HBase,Redis儲存offset資訊,Spark Streaming可以通過讀取這些外部資料庫,獲取最新的消費資訊。
2.4 kafka
Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消費者API即非同步提交API。你可以在你確保你處理後的資料已經妥善儲存之後使用commitAsync API(非同步提交API來向Kafka提交offsets。新的消費者API會以消費者組id作為唯一標識來提交offsets。
3. 例項demo
本文通過兩個例子,展示Streaming管理offset的方法。
3.1 使用kafka自身儲存offset
Kafka版本0.10.1.1,已預設將消費的offset遷入到了Kafka一個名為__consumer_offsets的Topic中。所以我們讀寫offset的物件正是這個topic,實際上,一切都已經封裝好了,直接呼叫相關API即可。
3.2 使用redis儲存offset
根據官網推薦的使用步驟,其實也就兩部分,一是從外部資料庫中讀取offset,第二是完成一個批次的操作後,更新庫裡的offset值。本demo以儲存在redis為例,簡要列出相關程式碼。
3.2.1 RedisUtils
基本的redis工具類
3.2.2 StreamingTest
前面的配置spark,kafka與之前一樣,之後首先配置redis資訊,並從redis讀取topic各分割槽對應的lastoffset
再建立stream流,每個partition處理完成後,需要更新這個partition的offset值。
4. 測試
兩個都已經親測可以正常使用,這裡就簡單拿offset儲存在kafka這個例子做個測試。我的測試版本是spark 2.1.2 + kafka 0.10.0.1。
首先啟動streaming程式,在kafka producer終端打進幾個測試資料
可以看到確實消費了三條資料,把程式終止。
再向這個topic打進三條資料,打完後重啟streaming程式。
我們可以看到,確實是從最新的三條資料開始消費的,之前的資料沒有被消費。做到了exactly onece
5. 總結
相對於離線批處理,流處理需要考慮的地方更多一些,對程式的魯棒性要求也更高。對offset的管理只是其中最基本的一環,鑑於有些坑還沒踩過,關於Spark Streaming也不是很熟悉,如有紕漏或不對的地方,請多指教。
參考文獻
-
https://www.cnblogs.com/lianxuan1768/p/8127553.html 《KAFKA OFFSET的儲存問題》
-
https://blog.csdn.net/sand_clock/article/details/68486599 《kafka中的offset》
-
https://www.jianshu.com/p/ef3f15cf400d 《Spark Streaming 管理 Kafka Offsets 的方式探討》
-
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 《Spark官網》
-
http://lxw1234.com/archives/2018/02/901.htm 《實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重》
-
https://github.com/Talefairy/sparkStreaming-offset-to-zk
歡迎閱讀、訂閱、轉載、收藏
長按識別二維碼