JStorm 原始碼分析 - Task 的啟動與執行
本篇主要介紹task的建立與執行過程
文章開頭, 先丟擲一些疑問:
- 為什麼TaskTransfer、TaskReceiver 要在初始化Task的時候建立, 為什麼不在Worker裡直接建立好? 這麼做有什麼好處呢?
- TaskTransfer/Receiver 與 Task 是1對1的關係麼? 如果是,為什麼? 如果不是,那設定了怎樣的值, 為什麼?
- start up bolt 發給系統 bolt 是做什麼用的?
- 至於在建立拓撲時的並行度等屬性, 在提交拓撲時(?確定是這個時候麼?)就已經分配好了. 例如 BoltA 並行度為 3, 那麼會建立3個Task物件, 每個物件都持有BoltA? 而不是一個Task 持有多個 Executor ,每個Executor使用執行bolt的業務程式碼 ?
- 為什麼建立executor時,要傳遞當前task進去,如 baseExecutor = new BoltExecutors(this) . 而不是傳遞必須的引數進去.這是否是為了強調task和executor之間的某種聯絡?如果是,那麼又是什麼聯絡呢? 在較早版本的原始碼裡面,是否有反映這個聯絡呢?
建立過程:
和worker的啟動方式類似 (JStorm對初始化方法和啟動方法使用了相同風格的命名, 減少了理解成本 ^_^)
- 執行 Task#new, 將必需的引數從workerData中取出 比較重要的一步: 獲取taskObj. 是通過Common#get_task_object獲取, 這個物件就是在建立拓撲時 new 出的Bolt/Spout物件 (原汁原味,沒有執行過prepare)
- 執行 Task#execute, 進行必要的初始化工作
- 傳送“start up” bolt 給系統bolt
- 建立 TaskTransfer. 作用是當 Bolt 中的業務程式碼呼叫 collect#emit 時, collect 內部使用 TaskTransfer 來發送訊息給下游 Task.
- 建立 Executor 並在 AsyncLoopThread 中啟動 詳見 jstrom executor
Executor 是真正執行 Bolt/Spout 的地方.- 呼叫 Bolt/Spout#prepare 來初始化, Collector 也是在此時初始化並傳遞給 Bolt 的
- 迴圈呼叫 Bolt/Spout#execute 來對訊息進行處理
- 若 Bolt/Spout 需傳送訊息, 需呼叫 Collector#emit, 訊息會經過 TaskTransfer 投遞給下游 Task
- 建立 TaskReceiver. 作用是反序列化其他 Worker 中的上游 Task 發來的訊息 (當前 Worker 的訊息在投遞時沒有序列化過,自然更不需要反序列化),並交由 Executor 消費.
執行過程:
上文寫到:task啟動時,建立 Executor 並在 AsyncLoopThread 中啟動. AsyncLoopThread 啟動後, 會在 while 迴圈中, 不斷地執行 Executor 物件的 run 方法.
Executor 物件有 3 個子類, 因為篇幅有限(為了偷懶), 這裡僅介紹 BoltExecutors 所作的操作, 其他子類的原理類似, 可以自己研究.
首先來檢視 BoltExecutors 的繼承關係:
- 由於實現了Runnable 介面, 因此 BoltExecutors 可以被 AsyncLoopThread 執行, 可以知道 BoltExecutors 程式碼執行的入口是 run 方法.
- 繼承了 EventHandler 介面 (這是LMAX 開發的 Disruptor 框架中提供的一個訊息消費介面), 這告訴我們 BoltExecutors 具有從DisruptorQueue 消費訊息的功能.
看run方法
BoltExecutors 會在 while 迴圈中不斷批量消費 exeQueue(DisruptorQueue) 中的訊息, 且呼叫時傳入的 eventHandler 是自身. exeQueue 中存放的是待消費的訊息 ( 上游Task emit的訊息最終會放在這個佇列中 ). 因此這裡的 consumeBatchWhenAvailable 會呼叫 BoltExecutors#onEvent 方法,如下所示:
程式碼過長,只截一部分.
BoltExecutors的onEvent 方法做了以下操作:
- 當 event 為 Tuple 時, 會呼叫 IBolt#execute 進行處理.
- Tuple 的型別可能是single 或 batch, 如果是 batch, 那就 for 迴圈裡呼叫 IBolt#execute
- 實際上會做一些更加細緻的判斷,例如:
- tuple 是否支援事務
- 是否是 IRichBolt 所 emit 的 tuple
- 是否是 system bolt
- 等等
- 實際上還會通過 Metric 做一些效能的記錄
不過主要邏輯是1、2兩點, 更加細緻的操作, 後續有空可能會分享出來^_^.