無鏡--kafka之服務端--時間輪
時間輪
Kafka中存在大量的延遲操作,比如延遲生產,延遲拉取,延遲加入,延遲心跳等。kafka使用時間輪(TimingWheel)來來實現管理延遲任務和超時時完成延遲任務。
時間輪(TimingWheel)是儲存定時任務的環形佇列,底層採用陣列實現,陣列中的每個元素可以存放一個定時任務列表(TimerTaskList)。定時任務列表(TimerTaskList)是一個雙向連結串列,連結串列中的每一項都是一個定時任務項(TimerTaskEntry),定時任務項中封裝的就是真的定時任務(TimerTaskEntry(比如延遲操作))。
時間輪由多個時間格(槽)組成,每個時間格(槽)代表當前時間輪的基本時間跨度(tickMs)。時間輪的時間格(槽)個數是固定的,通過wheelSize設定,時間輪的總體時間跨度interval=tickMs*wheelSize。錶盤指標currentTime表示時間輪當前所處的時間,currentTime將整個時間輪劃分為到期部分和未到期部分,currentTime當前指向的時間格屬於到期部分,剛好到期,需要處理此時間格所對應的TimerTaskList的所有任務。
時間輪的tickMs=1ms,wheelSize=20,interval=20ms,初始情況下表盤指標currentTime指向時間格0,此時有一個延遲時間為2ms的任務進來,會存放到時間格為2的定時任務列表(TimerTaskList)中。隨著時間不斷的推移,currentTime不斷向前推進,過了2ms之後,到達時間格2時,就需要將時間格2所對應的TimeTaskList中的任務做相應的到期操作。此時若又有一個延遲時間為8ms的任務插入進來,則會存放到時間格為10的任務列表(TimerTaskList)中,從中可以看出定時任務存放到哪個時間格中,是從當前錶盤指標指向的時間格開始算起。不是從初始時間格開始算起。 那麼同時有一個延遲時間為19ms的任務,那麼這個時候,這個任務就會存放到時間格為1中。
總結一下:只要延遲時間沒有超過當前時間輪的跨度,都會儲存在當前時間輪中,選擇時間格從當前錶盤指標指向的時間格的tickMs開始計算。這樣隨著錶盤指標(currentTime)的不斷推進,當前時間輪處理的時間段也在不斷的後移,時間範圍在currentTime於currentTime+interval之間。
當延遲時間超過了時間輪的跨度,如果是直接擴充當前時間輪的時間格(wheelSize的大小),那麼在kafka的這種級別的延遲任務,那麼這個時間輪會很大,比如100萬毫秒,這種情況下時間輪就會佔用很大的空間,並且走一輪的話,花費的時間也比較長。所以當延時時間超過了當前時間輪的跨度(interval),kafka會重新建立一個時間輪:第一個時間輪會持有第二個時間輪的引用,第一個時間輪的interval為第二個時間輪的tickMs。所以第二個時間輪:tickMs=20ms,wheelSize=20,interval=400ms。
這是來了一個延遲時間是350ms的延遲任務,第一層時間輪是不能滿足的,於是只能想更高一層的時間輪來儲存,第二層時間輪的跨度interval大於延遲時間,第二層可以滿足儲存這個延遲任務,所以這個350ms會被存到到第二層時間輪的17時間格中。時間格的超時時間是tickMs。但是時間格定時任務列表(TimerTaskList)中的延遲任務是一個連結串列,連結串列中的延遲任務的超時時間不全是tickMs,比如第二層時間輪的第17個時間格中就會儲存延遲時間是340ms到360ms的延遲任務。所以當TimerTaskList超時之後,如果連結串列裡面還有不能執行的延遲任務,就有一個時間輪的降級操作,比如第二層時間輪的第17個時間格在340ms到期後,裡面360ms還有20ms才能執行到期操作,這時第一層的時間輪可以存放這個還有20ms到期的延遲任務,所以這個延遲任務就修改自己延遲時間,並被再次加入到第一層的時間格中。再過20ms後,真正到期,最終執行相應的到期操作。
怎麼判斷任務過期?
我們把時間輪的時間格(槽)放入到DelayQueue中,因為每個時間格(槽)都有統一的一個過期時間,這個過期的槽會被DelayQueue的poll彈出,我們只需要將槽中的所有任務迴圈一次,重新加入到新的槽中,新增失敗就立即執行。
參考: