Kafka 原始碼系列之 Broker 的 IO 服務及業務處理
Kafka 原始碼系列之 Broker 的 IO 服務及業務處理
一, kafka 角色
Kafka 原始碼系列主要是以 kafka 0.8.2.2 原始碼為例。以看 spark 等原始碼的經驗總結除了一個重要的看原始碼的思路:先了解部件角色和功能角色,然後逐個功能請求序列畫圖分析,最後再彙總。那麼,下面再囉嗦一下, kafka 的角色。 kafka 在生產中的使用,如下圖。
從圖中可以看到其主要角色:
1 , Zookeeper:Broker 需要通過 ZooKeeper 記錄叢集的所有 Broker , controller 等資訊,記錄 Consumer 的消費訊息的偏移量等資訊。
2 , Broker: 主要負責管理資料,處理資料的生產、消費請求及副本的同步等資訊。
3 , Topic: 標識一個類別的訊息。
4 , Partition: 針對 topic 進行了進一步細分,增加併發度。牽涉到副本及 leader 選舉。
5 , Producer: 主要與 Broker 進行互動,來生產訊息到 broker 。
6 , Consumer: 主要是從 Broker 上獲取訊息,將自己的消費偏移等資訊記錄與 zookeeper 。
從各個角色的功能來看,我們整個資料服務請求的中心就是 Broker ,自然也是由 Broker 來負責各種事件處理及應答各個部件的。
二, Broker 請求及應答機制的實現
在 JAVA 的網路 IO 模型徹底講解的那篇文章裡,已經徹底講解了 Java 的各種網路 IO 實現的機制及優缺點。其實, kafka 的 Broker 就是通過 JAVA 的 NIO 來實現監聽和請求處理及應答的。
主要牽涉到的類:
1) ,KafkaServer
該類代表了一個 kafka Broker 的生命週期,處理 kafka 啟動或者停止所需要的所有功能。
2) ,SocketServer
一個 NIO 服務中心。執行緒模型是
1 個 Acceptor 執行緒,用來處理新的連結請求
N 個加工 Processor 執行緒。每個執行緒擁有一個他們自己的 selector ,主要負責 IO 請求及應答。
3) ,KafkaRequestHandler
實際會在 KafkaRequestHandlerPool 中建立多個物件,負責加工處理 request 執行緒。
會建立 M 個處理 Handler 執行緒。負責處理 request 請求,將 responses 重新寫會加工執行緒 Processor ,以便於其寫回給客戶端。
4) ,RequestChannel
該類主要是封裝了 requestQueue , responseQueues , responseListeners ,便於個各類中同時引用並作出自己的處理。
5) ,KafkaApis
Kafka 多樣請求的邏輯處理程式。
具體如圖:
下面講解 1,2,3,4,5 ,具體含義:
1 , SocketServer.startup() ,會啟動一個後臺執行緒,該執行緒會持有一個 acceptor ,負責接收新的連結請求,並輪訓所有的 Processor ,將新的連結請求加入 Processor 物件的成員變數 ConcurrentLinkedQueue 裡, Processor 會在其 run 方法裡面處理。
// start accepting connections
this . acceptor = new Acceptor(host , port , processors , sendBufferSize , recvBufferSize , quotas)
Utils. newThread ( "kafka-socket-acceptor" , acceptor , false ).start()
acceptor .awaitStartup
Processor 池的初始化
for (i <- 0 until numProcessorThreads) {
processors (i) = new Processor(i ,
time ,
maxRequestSize ,
aggregateIdleMeter ,
newMeter( "IdlePercent" , "percent" , TimeUnit. NANOSECONDS , Map( "networkProcessor" -> i.toString)) ,
numProcessorThreads ,
requestChannel ,
quotas ,
connectionsMaxIdleMs)
Utils. newThread ( "kafka-network-thread-%d-%d" .format(port , i) , processors (i) , false ).start()
}
accepttor輪訓Processor
val ready = selector.select(500) if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) }
2 , Processor 的 run 方法裡面,會針對可讀事件呼叫 read 方法裡將 request 請求資訊通過 requestChannel.sendRequest(req) 新增到 RequestChannel 的成員變數裡面。
requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
3 ,在 KafkaServer 的 startup 方法裡面構建 KafkaRequestHandlerPool 物件的時候,會構建若干 handler 執行緒。
for (i <- 0 until numThreads) {
runnables (i) = new KafkaRequestHandler(i , brokerId , aggregateIdleMeter , numThreads , requestChannel , apis)
threads (i) = Utils. daemonThread ( "kafka-request-handler-" + i , runnables (i))
threads (i).start()
}
在 KafakRequestHandler 的方法裡面會對我們的 request 進行處理
req = requestChannel.receiveRequest( 300 )
apis.handle(req)
實際上,是通過 KafkaApis 物件的 handle 方法進行各種邏輯的處理的。
def handle (request: RequestChannel.Request) {
try {
trace( "Handling request: " + request. requestObj + " from client: " + request.remoteAddress)
request. requestId match {
case RequestKeys. ProduceKey => handleProducerOrOffsetCommitRequest(request)
case RequestKeys. FetchKey => handleFetchRequest(request)
case RequestKeys. OffsetsKey => handleOffsetRequest(request)
case RequestKeys. MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys. LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys. StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys. UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys. ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys. OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys. OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys. ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException( "Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request. requestObj .handleError(e , requestChannel , request)
error( "error when handling request %s" .format(request. requestObj ) , e)
} finally
request. apiLocalCompleteTimeMs = SystemTime. milliseconds
}
4 ,在每一種請求處理結束之後會產生對應的 response
requestChannel.sendResponse( new RequestChannel.Response(request , new BoundedByteBufferSend(response)))
並將 response 儲存到 RequestChannel 的 responseQueues 儲存。
5 ,最終,由我們的 Processor 在其 run 方法裡面,取出 RequestChannel 的 responseQueues 儲存的時間,匹配到寫事件,然後通過其 write 方法對具體的 request 進行應答。
else if (key.isWritable)
write(key)
三,總結
這是一個典型的 Reactor 多執行緒模型,並且實現了 IO 執行緒和業務執行緒進行隔離。這樣做的優點有以下幾種 :
1, 充分利用資源
可以充分利用 CPU 資源,增加併發度,使業務響應速度加快。
2, 故障隔離 :
業務處理執行緒,無論是處理耗時,還是發生阻塞,都不會影響 IO 請求執行緒。保證伺服器能在某些業務執行緒出故障的情況下,正常進行 IO 請求應答。
3, 可維護性
職責單一,可維護性高,方便定位問題。
此處再次建議大家仔細閱讀,浪尖關於 JAVA 的網路 IO 模型徹底講解那篇文章,徹底領會其意境。
此乃,原創。歡迎大家掃描二維碼,關注浪尖微信公眾號,大家共同進步。