20條關於Kafka叢集應對高吞吐量的避坑指南
Apache Kafka是一款流行的分散式資料流平臺,它已經廣泛地被諸如New Relic(資料智慧平臺)、Uber、Square(移動支付公司)等大型公司用來構建可擴充套件的、高吞吐量的、高可靠的實時資料流系統。
例如,在New Relic的生產環境中,Kafka群集每秒能夠處理超過1500萬條訊息,而且其資料聚合率接近1Tbps。 可見,Kafka大幅簡化了對於資料流的處理,因此它也獲得了眾多應用開發人員和資料管理專家的青睞。
然而,在大型系統中Kafka的應用會比較複雜。如果你的Consumers無法跟上資料流的話,各種訊息往往在未被檢視之前就已經消失掉了。
同時,它在自動化資料保留方面的限制,高流量的釋出+訂閱(publish-subscribe,pub/sub)模式等,可能都會影響到系統的效能。 可以毫不誇張地說,如果那些存放著資料流的系統無法按需擴容、或穩定性不可靠的話,估計大家經常會寢食難安。
為了減少上述複雜性,我在此分享New Relic公司為Kafka叢集在應對高吞吐量方面的20項最佳實踐。
我將從如下四個方面進行展開:
-
Partitions(分割槽)
-
Consumers(消費者)
-
Producers(生產者)
-
Brokers(代理)
一、快速瞭解Kafka的概念與架構
Kafka是一種高效的分散式訊息系統。在效能上,它具有內建的資料冗餘度與彈性,也具有高吞吐能力和可擴充套件性。
在功能上,它支援自動化的資料儲存限制,能夠以“流”的方式為應用提供資料轉換,以及按照“鍵-值(key-value)”的建模關係“壓縮”資料流。
要了解各種最佳實踐,首先需要熟悉如下關鍵術語:
Message(訊息)
Kafka中的一條記錄或資料單位。每條訊息都有一個鍵和對應的一個值,有時還會有可選的訊息頭。
Producer(生產者)
Producer將訊息釋出到Kafka的topics上。Producer決定向topic分割槽的釋出方式,如:輪詢的隨機方法、或基於訊息鍵(key)的分割槽演算法。
Broker(代理)
Kafka以分散式系統或叢集的方式執行,那麼群集中的每個節點稱為一個Broker。
Topic(主題)
Topic是那些被髮布的資料記錄或訊息的一種類別。消費者通過訂閱Topic來讀取寫給它們的資料。
Topic Partition(主題分割槽)
不同的Topic被分為不同的分割槽,而每一條訊息都會被分配一個Offset,通常每個分割槽都會被複制至少一到兩次。
每個分割槽都有一個Leader和存放在各個Follower上的一到多個副本(即:資料的副本),此法可防止某個Broker的失效。
群集中的所有Broker都可以作為Leader和Follower,但是一個Broker最多隻能有一個Topic Partition的副本。Leader可被用來進行所有的讀寫操作。
Offset(偏移量)
單個分割槽中的每一條訊息都被分配一個Offset,它是一個單調遞增的整型數,可用來作為分割槽中訊息的唯一識別符號。
Consumer(消費者)
Consumer通過訂閱Topic partition,來讀取Kafka的各種Topic訊息。然後,消費類應用處理會收到訊息,以完成指定的工作。
Consumer group(消費組)
Consumer可以按照Consumer group進行邏輯劃分。Topic Partition被均衡地分配給組中的所有Consumers。
因此,在同一個Consumer group中,所有的Consumer都以負載均衡的方式運作。
換言之,同一組中的每一個Consumer都能群組看到分配給他的相應分割槽的所有訊息。如果某個Consumer處於“離線”狀態的話,那麼該分割槽將會被分配給同組中的另一個Consumer。這就是所謂的“再均衡(rebalance)”。
當然,如果組中的Consumer多於分割槽數,則某些Consumer將會處於閒置的狀態。
相反,如果組中的Consumer少於分割槽數,則某些Consumer會獲得來自一個以上分割槽的訊息。
Lag(延遲)
當Consumer的速度跟不上訊息的產生速度時,Consumer就會因為無法從分割槽中讀取訊息,而產生延遲。
延遲表示為分割槽頭後面的Offset數量。從延遲狀態(到“追趕上來”)恢復正常所需要的時間,取決於Consumer每秒能夠應對的訊息速度。
其公式如下:time=messages/(consume rate per second - produce rate per second)
1 針對Partitions
1)瞭解分割槽的資料速率,以確保提供合適的資料儲存空間
此處所謂“分割槽的資料速率”是指資料的生成速率。換言之,它是由“平均訊息大小”乘以“每秒訊息數”得出的資料速率決定了在給定時間內,所能保證的資料儲存空間的大小(以位元組為單位)。
如果你不知道資料速率的話,則無法正確地計算出滿足基於給定時間跨度的資料,所需要儲存的空間大小。
同時,資料速率也能夠標識出單個Consumer在不產生延時的情況下,所需要支援的最低效能值。
2) 除非有其他架構上的需要,否則在寫Topic時請使用隨機分割槽
在進行大型操作時,各個分割槽在資料速率上的參差不齊是非常難以管理的。
其原因來自於如下三個方面:
-
首先,“熱”(有較高吞吐量)分割槽上的Consumer勢必會比同組中的其他Consumer處理更多的訊息,因此很可能會導致出現在處理上和網路上的瓶頸。
-
其次,那些為具有最高資料速率的分割槽,所配置的最大保留空間,會導致Topic中其他分割槽的磁碟使用量也做相應地增長。
-
第三,根據分割槽的Leader關係所實施的最佳均衡方案,比簡單地將Leader關係分散到所有Broker上,要更為複雜。在同一Topic中,“熱”分割槽會“承載”10倍於其他分割槽的權重。
有關Topic Partition的使用,可以參閱《Kafka Topic Partition的各種有效策略》
參考連結:
ofollow,noindex" target="_blank"> https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/
2 針對Consumers
3) 如果Consumers執行的是比Kafka 0.10還要舊的版本,那麼請馬上升級
在0.8.x版中,Consumer使用Apache ZooKeeper來協調Consumer group,而許多已知的Bug會導致其長期處於再均衡狀態,或是直接導致再均衡演算法的失敗(我們稱之為“再均衡風暴”)。
因此在再均衡期間,一個或多個分割槽會被分配給同一組中的每個Consumer。
而在再均衡風暴中,分割槽的所有權會持續在各個Consumers之間流轉,這反而阻礙了任何一個Consumer去真正獲取分割槽的所有權。
4) 調優Consumer的套接字緩衝區(socket buffers),以應對資料的高速流入
在Kafka的0.10.x版本中,引數receive.buffer.bytes的預設值為64KB。而在Kafka的0.8.x版本中,引數socket.receive.buffer.bytes的預設值為100KB。
這兩個預設值對於高吞吐量的環境而言都太小了,特別是如果Broker和Consumer之間的網路頻寬延遲積(bandwidth-delay product)大於區域網(local areanetwork,LAN)時。
對於延遲為1毫秒或更多的高頻寬的網路(如10Gbps或更高),請考慮將套接字緩衝區設定為8或16MB。
如果記憶體不足,也至少考慮設定為1MB。當然,也可以設定為-1,它會讓底層作業系統根據網路的實際情況,去調整緩衝區的大小。
但是,對於需要啟動“熱”分割槽的Consumers來說,自動調整可能不會那麼快。
5) 設計具有高吞吐量的Consumers,以便按需實施背壓(back-pressure)
通常,我們應該保證系統只去處理其能力範圍內的資料,而不要超負荷“消費”,進而導致程序中斷“掛起”,或出現Consume group的溢位。
如果是在Java虛擬機器(JVM)中執行,Consumers應當使用固定大小的緩衝區,而且最好是使用堆外記憶體(off-heap)。
請參見Disruptor模式:
http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf
固定大小的緩衝區能夠阻止Consumer將過多的資料拉到堆疊上,以至於JVM花費掉其所有的時間去執行垃圾回收,進而無法履行其處理訊息的本質工作。
6) 在JVM上執行各種Consumers時,請警惕垃圾回收對它們可能產生的影響
例如,長時間垃圾回收的停滯,可能導致ZooKeeper的會話被丟棄、或Consumer group處於再均衡狀態。
對於Broker來說也如此,如果垃圾回收停滯的時間太長,則會產生叢集掉線的風險。
3 針對Producers
7) 配置Producer,以等待各種確認
籍此Producer能夠獲知訊息是否真正被髮送到了Broker的分割槽上。在Kafka的0.10.x版本上,其設定是Acks;而在0.8.x版本上,則為request.required.acks。
Kafka通過複製,來提供容錯功能,因此單個節點的故障、或分割槽Leader關係的更改不會影響到系統的可用性。
如果沒有用Acks來配置Producer(或稱“fireand forget”)的話,則訊息可能會悄然丟失。
8) 為各個Producer配置Retries
其預設值為3,當然是非常低的。不過,正確的設定值取決於你的應用程式,即:就那些對於資料丟失零容忍的應用而言,請考慮設定為Integer.MAX_VALUE(有效且最大)。
這樣將能夠應對Broker的Leader分割槽出現無法立刻響應Produce請求的情況。
9) 為高吞吐量的Producer,調優緩衝區的大小
特別是buffer.memory和batch.size(以位元組為單位)。由於batch.size是按照分割槽設定的,而Producer的效能和記憶體的使用量,都可以與Topic中的分割槽數量相關聯。
因此,此處的設定值將取決於如下幾個因素:
-
Producer資料速率(訊息的大小和數量);
-
要生成的分割槽數;
-
可用的記憶體量。
請記住,將緩衝區調大並不總是好事,如果Producer由於某種原因而失效了(例如,某個Leader的響應速度比確認還要慢),那麼在堆內記憶體(on-heap)中的緩衝的資料量越多,其需要回收的垃圾也就越多。
10) 檢測應用程式,以跟蹤諸如生成的訊息數、平均訊息大小、以及已使用的訊息數等指標
4 針對Brokers
11) 在各個Brokers上,請壓縮Topics所需的記憶體和CPU資源
日誌壓縮需要各個Broker上的堆疊(記憶體)和CPU週期都能成功地配合實現,而如果讓那些失敗的日誌壓縮資料持續增長的話,則會給Brokers分割槽帶來風險。
請參見:
https://kafka.apache.org/documentation/#compaction
你可以在Broker上調整log.cleaner.dedupe.buffer.size和log.cleaner.threads這兩個引數,但是請記住,這兩個值都會影響到各個Brokers上的堆疊使用。
如果某個Broker丟擲OutOfMemoryError異常,那麼它將會被關閉、並可能造成資料的丟失。
而緩衝區的大小和執行緒的計數,則取決於需要被清除的Topic Partition數量、以及這些分割槽中訊息的資料速率與金鑰的大小。
對於Kafka的0.10.2.1版本而言,通過ERROR條目來監控日誌清理程式的日誌檔案,是檢測其執行緒可能出現問題的最可靠方法。
12) 通過網路吞吐量來監控Brokers
請監控發向(transmit,TX)和收向(receive,RX)的流量,以及磁碟的I/O、磁碟的空間和CPU的使用率,而且容量規劃是維護群集整體效能的關鍵步驟。
13) 在群集的各個Brokers之間分配分割槽的Leader關係
Leader通常會需要大量的網路I/O資源。例如,當我們將複製因子(replication factor)配置為3、並執行起來時。
Leader必須首先獲取分割槽的資料,然後將兩套副本傳送給另兩個Followers,進而再傳輸到多個需要該資料的Consumers上。
因此在該例子中,單個Leader所使用的網路I/O,至少是Follower的四倍。而且,Leader還可能需要對磁碟進行讀操作,而Follower只需進行寫操作。
14) 不要忽略監控Brokers的in-sync replica(ISR)shrinks、under-replicatedpartitions和unpreferred leaders
這些都是叢集中潛在問題的跡象。例如,單個分割槽頻繁出現ISR收縮,則暗示著該分割槽的資料速率超過了Leader的能力,已無法為Consumer和其他副本執行緒提供服務了。
15) 按需修改Apache Log4j的各種屬性
詳細內容可以參考:
https://github.com/apache/kafka/blob/trunk/config/log4j.properties
Kafka的Broker日誌記錄會耗費大量的磁碟空間,但是我們卻不能完全關閉它。
因為有時在發生事故之後,需要重建事件序列,那麼Broker日誌就會是我們最好的、甚至是唯一的方法。
16) 禁用Topic的自動建立,或針對那些未被使用的Topics建立清除策略
例如,在設定的x天內,如果未出現新的訊息,你應該考慮該Topic是否已經失效,並將其從群集中予以刪除。此舉可避免花時間去管理群集中被額外建立的元資料。
17) 對於那些具有持續高吞吐量的Brokers,請提供足夠的記憶體,以避免它們從磁碟子系統中進行讀操作
我們應儘可能地直接從作業系統的快取中直接獲取分割槽的資料。然而,這就意味著你必須確保自己的Consumers能夠跟得上“節奏”,而對於那些延遲的Consumer就只能強制Broker從磁碟中讀取了。
18) 對於具有高吞吐量服務級別目標(service level objectives,SLOs)的大型群集,請考慮為Brokers的子集隔離出不同的Topic
至於如何確定需要隔離的Topics,則完全取決於自己的業務需要。例如,你有一些使用相同群集的聯機事務處理(multipleonline transaction processing,OLTP)系統。
那麼將每個系統的Topics隔離到不同Brokers子集中,則能夠有助於限制潛在事件的影響半徑。
19) 在舊的客戶端上使用新的Topic訊息格式。應當代替客戶端,在各個Brokers上載入額外的格式轉換服務
當然,最好還是要儘量避免這種情況的發生。
20) 不要錯誤地認為在本地主機上測試好Broker,就能代表生產環境中的真實效能了
要知道,如果使用複製因子為1,並在環回介面上對分割槽所做的測試,是與大多數生產環境截然不同的。
在環回介面上網路延遲幾乎可以被忽略的,而在不涉及到複製的情況下,接收Leader確認所需的時間則同樣會出現巨大的差異。
二、總結
希望上述各項建議能夠有助於大家更有效地去使用Kafka。如果你想提高自己在Kafka方面的專業知識,請進一步查閱Kafka配套文件中的“操作”部分,其中包含了有關操作群集等實用資訊。