Kafka 原始碼系列之分組消費的再平衡策略
一, Kafka 消費模式
從 kafka 消費訊息, kafka 客戶端提供兩種模式 : 分割槽消費,分組消費。
分割槽消費對應的就是我們的 DirectKafkaInputDStream
分組消費對應的就是我們的 KafkaInputDStream
消費者數目跟分割槽數目的關係 :
1) ,一個消費者可以消費一個到全部分割槽資料
2) ,分組消費,同一個分組內所有消費者消費一份完整的資料,此時一個分割槽資料只能被一個消費者消費,而一個消費者可以消費多個分割槽資料
3) ,同一個消費組內,消費者數目大於分割槽數目後,消費者會有空餘 = 分割槽數 - 消費者數
二,分組消費的再平衡策略
當一個 group 中 , 有 consumer 加入或者離開時 , 會觸發 partitions 均衡 partition.assignment.strategy, 決定了 partition 分配給消費者的分配策略,有兩種分配策略:
1 , org.apache.kafka.clients.consumer.RangeAssignor
預設採用的是這種再平衡方式,這種方式分配只是針對消費者訂閱的 topic 的單個 topic 所有分割槽再分配, Consumer Rebalance 的演算法如下:
1), 將目標 Topic 下的所有 Partirtion 排序,存於 TP
2), 對某 Consumer Group 下所有 Consumer 按照名字根據字典排序,存於 CG ,第 i 個 Consumer 記為 Ci
3),N=size(TP)/size(CG)
4),R=size(TP)%size(CG)
5),Ci 獲取的分割槽起始位置 =N*i+min(i,R)
6),Ci 獲取的分割槽總數 =N+(if (i+ 1 > R) 0 else 1)
2 , org.apache.kafka.clients.consumer.RoundRobinAssignor
這種分配策略是針對消費者消費的所有 topic的所有分割槽進行分配。當有新的消費者加入或者有消費者退出,就會觸發rebalance。這種方式有兩點要求
A) ,在例項化每個消費者時給每個topic指定相同的流數
B) ,每個消費者例項訂閱的topic必須相同
Map <String , Integer> topicCountMap = new HashMap<String , Integer>() ;
topicCountMap.put( topic , new Integer( 1 )) ;
Map <String , List<KafkaStream< byte [] , byte []>>> consumerMap = consumer .createMessageStreams(topicCountMap) ;
其中, topic對應的value就是流數目。對應的kafka原始碼是在
在 kafka.consumer.ZookeeperConsumerConnector的consume方法裡,根據這個引數構建了相同數目的KafkaStream。
這種策略的具體分配步驟 :
1) , 對所有 topic 的所有分割槽按照 topic+partition 轉 string 之後的 hash 進行排序
2) , 對消費者按字典進行排序
3) , 然後輪訓的方式將分割槽分配給消費者
3 ,舉例對比
舉個例子,比如有兩個消費者 (c0,c1) ,兩個 topic(t0,t1) ,每個 topic 有三個分割槽 p(0-2) ,
那麼採用 RangeAssignor ,結果為:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
採用 RoundRobinAssignor ,結果為:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
三,本節原始碼設計的重要概念及 zookeeper 相關目錄
1 ,本節涉及的 zookeeper 目錄
A) , 消費者目錄 , 獲取子節點就可以獲取所有的消費者
/consumers/group.id/ids/
B) ,topic 的目錄,可以獲取 topic ,分割槽及副本資訊
/brokers/topics/topicName
值:
{"version":1,"partitions":{"0":[5,6],"2":[1,4],"27":[0,4],"1":[7,0]}}
partitions對應值的 key是分割槽id,value是Broker id列表。
C), 分割槽所屬的消費者執行緒關係
/consumers/groupId/owners/topic/partitionid
值就是消費者執行緒 id ,也就是在 A 向獲取的消費者後加了一個 id 值。
2 ,涉及的概念
A), 消費者 ID
val consumerIdString = {
var consumerUuid : String = null
config. consumerId match {
case Some ( consumerId ) // for testing only
=> consumerUuid = consumerId
case None // generate unique consumerId automatically
=> val uuid = UUID. randomUUID ()
consumerUuid = "%s-%d-%s" .format(
InetAddress. getLocalHost .getHostName , System. currentTimeMillis ,
uuid.getMostSignificantBits().toHexString.substring( 0 , 8 ))
}
config. groupId + "_" + consumerUuid
}
B), 消費者執行緒 ID
主要是在消費者 id 的基礎上,根據消費者構建指定的 topic 的 Stream 數目,遞增加了個數字的值
for ((topic , nConsumers) <- topicCountMap) {
val consumerSet = new mutable.HashSet[ConsumerThreadId]
assert (nConsumers >= 1 )
for (i <- 0 until nConsumers)
consumerSet += ConsumerThreadId (consumerIdString , i) //ConusmerId的結尾加上一個常量區別 owners 目錄下可以看到
consumerThreadIdsPerTopicMap.put(topic , consumerSet)
}
ConsumerThreadId
"%s-%d" .format(consumer , threadId)
C),TopicAndPartition
帶 topic 名字的表示每個分割槽,重點關注其 toString 方法,在比較的時候用到了。
TopicAndPartition(topic: String , partition: Int )
override def toString = "[%s,%d]" .format(topic , partition)
四,原始碼解析
1 , AssignmentContext
主要作用是根據指定的消費組,消費者, topic 資訊,從 zookeeper 上獲取相關資料並解析得到,兩種分配策略要用的四個資料結構。解析過程請結合 zookeeper 的相關目錄及節點的資料型別和 kafka 原始碼自行閱讀。
class AssignmentContext(group: String , val consumerId: String , excludeInternalTopics: Boolean, zkClient: ZkClient) {
//(topic,ConsumerThreadIdSet) //指定一個消費者,根據每個topic指定的streams數目,構建相同數目的流
val myTopicThreadIds : collection.Map[ String , collection.Set[ConsumerThreadId]] = {
val myTopicCount = TopicCount. constructTopicCount (group , consumerId , zkClient , excludeInternalTopics)
myTopicCount.getConsumerThreadIdsPerTopic
}
//(topic 分割槽) /brokers/topics/topicname 值
val partitionsForTopic : collection.Map[ String , Seq [ Int ]] =
ZkUtils. getPartitionsForTopics (zkClient , myTopicThreadIds .keySet.toSeq)
//(topic,ConsumerThreadId(當前消費者id)) ///consumers/Groupid/ids 子節點
val consumersForTopic : collection.Map[ String , List [ConsumerThreadId]] =
ZkUtils. getConsumersPerTopic (zkClient , group , excludeInternalTopics)
///consumers/Groupid/ids的所有的子節點
val consumers : Seq [ String ] = ZkUtils. getConsumersInGroup (zkClient , group).sorted
}
2 , RangeAssignor
class RangeAssignor() extends PartitionAssignor with Logging {
def assign (ctx: AssignmentContext) = {
val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition , ConsumerThreadId]()
for ((topic , consumerThreadIdSet) <- ctx. myTopicThreadIds ) {
val curConsumers = ctx. consumersForTopic (topic) //當前topic的所有消費者
val curPartitions: Seq [ Int ] = ctx. partitionsForTopic (topic) //當前topic的所有分割槽
//
val nPartsPerConsumer = curPartitions.size / curConsumers.size
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
info( "Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) {
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //獲取當前消費者執行緒的在集合中的位置
assert (myConsumerPosition >= 0 )
//獲取分割槽的起始位置
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
//獲取當前消費者消費的分割槽數目
val nParts = nPartsPerConsumer + ( if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1 )
/**
* Range-partition the sorted partitions to consumers for better locality.
* The first few consumers pick up an extra partition, if any.
*/
if (nParts <= 0 )
warn( "No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
else {
//將分割槽關係描述寫入partitionOwnershipDecision
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
// record the partition ownership decision
partitionOwnershipDecision += ( TopicAndPartition (topic , partition) -> consumerThreadId)
}
}
}
}
partitionOwnershipDecision
}
}
3 , RoundRobinAssignor
class RoundRobinAssignor() extends PartitionAssignor with Logging {
def assign (ctx: AssignmentContext) = {
val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition , ConsumerThreadId]()
// check conditions (a) and (b) topic, List[ConsumerThreadId]
val (headTopic , headThreadIdSet) = (ctx. consumersForTopic .head._1 , ctx. consumersForTopic .head._2.toSet)
//測試輸出
ctx. consumersForTopic .foreach { case (topic , threadIds) =>
val threadIdSet = threadIds.toSet
require (threadIdSet == headThreadIdSet ,
"Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " +
"AND if the stream counts across topics are identical for a given consumer instance. \n " +
"Topic %s has the following available consumer streams: %s \n " .format(topic , threadIdSet) +
"Topic %s has the following available consumer streams: %s \n " .format(headTopic , headThreadIdSet))
}
//為傳入的集合建立一個迴圈迭代器,傳入之前排序是按照字典排序
val threadAssignor = Utils. circularIterator (headThreadIdSet.toSeq.sorted)
info( "Starting round-robin assignment with consumers " + ctx. consumers )
//TopicAndPartition 按照字串的hash排序
val allTopicPartitions = ctx. partitionsForTopic .flatMap { case (topic , partitions) =>
info( "Consumer %s rebalancing the following partitions for topic %s: %s"
.format(ctx.consumerId , topic , partitions))
partitions.map(partition => {
TopicAndPartition (topic , partition) //toString = "[%s,%d]".format(topic, partition)
})
}.toSeq.sortWith((topicPartition1 , topicPartition2) => {
/*
* Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
* up on one consumer (if it has a high enough stream count).
*/
//按照hash值進行排序
topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})
//過濾得到當前消費者的執行緒id
allTopicPartitions.foreach(topicPartition => {
val threadId = threadAssignor.next()
if (threadId.consumer == ctx.consumerId)
partitionOwnershipDecision += (topicPartition -> threadId)
})
//返回得到結果
partitionOwnershipDecision
}
}
五,總結
本文主要是講解分組消費的兩種將分割槽分配給消費者執行緒的分配策略。結合前面兩篇
<Kafka原始碼系列之Consumer高階API效能分析>和<Kafka原始碼系列之原始碼解析SimpleConsumer的消費過程>,大家應該會對kafka的java 消費者客戶端的實現及效能優缺點有徹底的瞭解了。
分組,分割槽兩種種模型其實跟kafka叢集並沒有關係,是我們java客戶端實現的區別。 生產中可以根據自己的需要選擇兩種消費模型。建議流量不是很大,也沒過分的效能需求,選擇分組消費,這樣同分組多消費者的話相當於實現了同分組的消費者故障轉移。
本文乃原創,希望大家尊重浪尖成功,不足之處請留言指正。
歡迎大家關注浪尖公眾號,一起開啟分散式之旅。