Pulsar-Producer實現簡介
“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”
Pulsar是pub-sub模式的分散式訊息平臺,擁有靈活的訊息模型和直觀的客戶端API。
Pulsar由雅虎開發並開源的下一代訊息系統,目前是Apache軟體基金會的孵化器專案。
本片文章簡單介紹Pulsar的Producer,包含以下內容:
- Producer的設計
- 訊息傳送的實現
1. Producer設計
1.1 建立Producer
以上是Pulsar官網上建立一個Producer的示例程式碼。
建立的過程如下:
- 指定serviceUrl建立PulsarClient
- 指定Producer傳送訊息的Topic,通過PulsarClient建立Producer
通過上述的建立程式碼可以推測:
- serviceUrl應該是用於做服務發現的,通過serviceUrl查詢Broker的資訊
- Producer指定了Topic,那麼一個Producer只能往特定的Topic傳送訊息
1.2 Producer API
Pulsar中,傳送相關的介面為Producer,如上圖所示:
- Producer定義了傳送介面
- ProducerBase作為抽象類,提供了基礎實現
- ProducerImpl則是真正的實現類
- PartitionedProducerImpl看著和分割槽相關,這個之後再看
Producer 介面具體如下:
public interface Producer<T> extends Closeable { /** * 返回Producer傳送訊息的Topic */ String getTopic(); /** * Producer的名稱 */ String getProducerName(); /** * 同步傳送訊息 */ MessageId send(T message) throws PulsarClientException; /** * 有傳送訊息 */ CompletableFuture<MessageId> sendAsync(T message); /** * Flush客戶端完成中的訊息並等待所有訊息成功持久化 * @since 2.1.0 * @see #flushAsync() */ void flush() throws PulsarClientException; /** * 非同步Flush客戶端完成中的訊息並等待所有訊息成功持久化 * @since 2.1.0 * @see #flush() */ CompletableFuture<Void> flushAsync(); /** * 建立TypedMessageBuilder,用於構建訊息 */ TypedMessageBuilder<T> newMessage(); /** * 同步傳送訊息,已經被棄用 */ @Deprecated MessageId send(Message<T> message) throws PulsarClientException; /** * 非同步傳送訊息,已經被棄用 */ @Deprecated CompletableFuture<MessageId> sendAsync(Message<T> message); /** * 獲取Producer傳送的最後一個序列號 */ long getLastSequenceId(); /** * 獲取Producer的統計資訊 */ ProducerStats getStats(); /** * 非同步關閉Producer並且釋放資源 */ CompletableFuture<Void> closeAsync(); /** * 返回Producer是否連線到Broker上 */ boolean isConnected(); }
通過Producer介面可以看出Pulsar Producer提供的能力:
- 同步傳送訊息
- 非同步傳送訊息
- 一個Producer只能向一個特定的Topic傳送訊息(Producer#topic()返回了一個Topic,說明Producer會繫結到一個Topic上)
- 批量傳送(flush方法說明了應該是支援批量的,訊息會在客戶端記憶體中儲存)
- 包含了sequenceId是否可以做冪等之類的事情?
- 統計能力
1.3 ProducerBase
ProducerBase作為抽象類,實現了Producer介面。
ProducerBase包含四個屬性:
- producerCreatedFuture:非同步建立Producer的Future
- conf:Producer的配置
- schema:訊息相關的Schema資訊
- interceptors:Producer的攔截器,在傳送前後插入一些操作
producerCreatedFuture
重新命名上看這個屬性是用於非同步建立Producer。
但是在一個基類中提供非同步建立實體的Future顯得比較難理解。一般的程式設計思路會在基類中定義一些基礎的公共的屬性,用於儲存狀態或者配置,比如conf。這裡的producerCreatedFuture實際用於PartitionedProducerImpl非同步建立多個Producer,這個後續再看。
conf
ProducerConfigurationData提供了Producer相關的配置資訊,包含是否批量傳送、記憶體快取訊息的大小、傳送的Timeout等。
schema
Schema指明瞭訊息的格式,通過Schema完成對訊息的encode和decode。
interceptors
ProducerInterceptor是Producer提供的攔截器,包含兩個方法:beforeSend、onSendAcknowledgement,分別用於在傳送前和傳送後插入行為。
1.4 ProducerImpl
ProducerImpl繼承了ProducerBase,是Producer介面的核心實現。
ProducerImpl在ProducerBase的基礎上增加了大量的屬性,包含:
- producerId:通過AtomicLong生成的程序內唯一的Producer ID
- msgIdGenerator:訊息ID
- pendingMessages:記憶體中快取的訊息
- pendingCallbacks:記憶體中快取的訊息對應的Callback
- timeout:傳送的超時配置
- batchMessageContainer:批量訊息的容器
- producerName:全域性唯一的Producer名稱
- 等等...(在後續傳送實現中介紹相關的屬性)
ProducerImpl實現了具體的傳送行為,比如同步傳送、非同步傳送(後續在訊息傳送的實現部分介紹)。
1.5 PartitionedProducerImpl
Producer提供的傳送相關的API定義,ProducerBase提供了基礎實現,ProducerImpl提供了具體的實現,那麼PartitionedProducerImpl做什麼?
通過PartitionedProducerImpl的屬性可以看到內部包含了一個ProducerImpl列表,那麼可以PartitionedProducerImpl和ProducerImpl是一個組合的關係。
通過start方法可以看出,PartitionedProducerImpl根據對應的topicMetadata的分割槽數建立了對應數量的ProducerImpl例項(這裡也說明了ProducerBase中producerCreatedFuture的用途)。
為什麼在PartitionedProducerImpl中需要建立一組ProducerImpl例項?
PartitionedProducerImpl另外增加了一個routerPolicy屬性,其介面為:
public interface MessageRouter extends Serializable { @Deprecated default int choosePartition(Message<?> msg) { throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead"); } default int choosePartition(Message<?> msg, TopicMetadata metadata) { return choosePartition(msg); } }
通過介面的定義不難理解MessageRouter介面實現Message和Partition的對映。
通過internalSendAsync方法的實現可以看出,傳送訊息時通過routerPolicy將訊息對映到Partition,通過Partition選擇對應的Producer執行傳送,那麼久解釋了為什麼在PartitionedProducerImpl會建立和對應Topic的分割槽數相同的ProducerImpl。
通過以上內容,能總結出Producer模組的各個類的職責:
- Producer:定義傳送介面,使用者使用的核心API
- ProducerBase:Producer介面的基礎實現
- ProducerImpl:實現具體的傳送行為,一個ProducerImpl只能向一個Topic寫入訊息
- PartitionedProducerImpl:整合多個ProducerImpl,用於向多分割槽傳送訊息的場景
2. 訊息傳送的實現
在對Producer模組有個整體的認識後,後續內容具體闡述一條訊息的傳送流程。
在訊息系統中,從Producer的視角看,一條訊息寫入過程一般包含:
- 訊息校驗
- 訊息屬性增強(新增一些必要的系統屬性)
- 訊息路由(選擇目標分割槽)
- 訊息序列化
- 訊息資料寫入網路
- 等待寫入結果響應
- 返回寫入結果
下面將通過ProducerImpl的實現來了解Pulsar的Producer傳送訊息的過程。
2.1 定址
要傳送一條訊息,除了校驗訊息是否合法,首先要這條訊息的寫入目標(通過路由找到訊息目標的Partition)。
在ProducerImpl的構造方法最後一行呼叫了grabCnx()方法建立了連結(構建了連結的上下文)。
grabCnx方法通過PulsarClient建立Connection,而PubsarClient內部則通過LookupService介面來完成Topic到Broker的對映並建立連結。
LookupService介面提供了BinaryProtoLookupService和HttpLookupService實現,通過LookupService使用者也可以實現自己的服務發現模組。
2.2 訊息傳送
傳送訊息的呼叫鏈如上圖所示,最終通過ProducerImpl的internalSendAsync將訊息傳送出去。無論同步傳送還是非同步傳送,最終都會通過非同步的方式執行傳送(同時只是在非同步的基礎上等待發送結果),這裡可以看到Pulsar Producer在API實現上比較注重程式碼的複用性即API的最小功能原則。
以單挑訊息傳送為例,sendAsync的具體實現如下:
- 在必要的校驗後,將訊息包裝成OpSendMsg物件(包含非同步傳送完成後的Callback)
- 將訊息新增到pendingMessages
- 通過Connection的EventLoop執行傳送操作
ProducerImpl將在ackReceived方法中處理服務端對寫入訊息的響應,通過訊息的sequenceId來識別對應的OpSendMsg,並呼叫對應Callback來執行回撥邏輯。實際在Callback完成了響應使用者的操作及傳送行為的一些統計。
ProducerImpl只會建立一個連結,且傳送和ACK都是通過synchronized執行的,所以中間通過pendingMessages來完成訊息傳送和響應的對應,以及超時的處理。這塊具體可以看一下程式碼實現。
總結
本文介紹了Pulsar Producer模組的設計,包含各個類的職責,並簡單介紹了訊息的傳送過程。Puslar Producer在設計上和RocketMQ的思想差異還是比較大的,比如Puslar Producer會將Producer對應到分割槽上,每個分割槽有自己的Producer,這樣可以比較容易完成一些冪等之類的操作。