RocketMQ訊息傳送的高可用設計
從rocketmq topic的建立機制可知,一個topic對應有多個訊息佇列,那麼我們在傳送訊息時,是如何選擇訊息佇列進行傳送的?假如這時有broker宕機了,rocketmq是如何規避故障broker的?看完這篇文章,相信你會從文中找到答案。
rocketmq在傳送訊息時,由於nameserver檢測broker是否還存活是有延遲的,在選擇訊息佇列時難免會遇到已經宕機的broker,又或者因為網路原因傳送失敗的,因此rocketmq採取了一些高可用設計的方案,主要通過兩個手段:重試與Broker規避 。
重試機制
直接定位到client端傳送訊息的方法:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) { // ... }
在client端,傳送訊息的方式有:同步(SYNC)、非同步(ASYNC)、單向(ONEWAY)。
那麼可以知道,retryTimesWhenSendFailed決定同步方法重試次數,預設重試次數為3次。
重試機制提高了訊息傳送的成功率。
選擇佇列的方式
我在這裡提出一個問題:
現在有個由兩個broker節點組成的叢集,有topic1,預設在每個broker上建立4個佇列,分別是:master-a(q0,q1,q2,q3)、master-b(q0,q1,q2,q3),上一次傳送訊息到master-a的q0佇列,此時master-a宕機了,如果繼續傳送topic1訊息,rocketmq如果避免再次傳送到master-a?
以上問題引出了rocketmq傳送訊息時如何選擇佇列的一些機制,選擇佇列有兩種方式,通過sendLatencyFaultEnable的值來控制,預設值為false,不啟動broker故障延遲機制,值為true時啟用broker故障延遲機制。
預設機制
sendLatencyFaultEnable=false,訊息傳送選擇佇列呼叫以下方法:
org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
這裡的lastBrokerName指的是上一次執行訊息傳送時選擇失敗的broker,在重試機制下,第一次執行訊息傳送時,lastBrokerName=null,直接選擇以下方法:
org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue:
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
sendWhichQueue是一個利用ThreadLocal本地執行緒儲存自增值的一個類,自增值第一次使用Random類隨機取值,此後如果訊息傳送出發重試機制,那麼每次自增取值。
此方法直接用sendWhichQueue自增獲取值,再與訊息佇列的長度進行取模運算,取模目的是為了迴圈選擇訊息佇列。
如果此時選擇的佇列傳送訊息失敗了,此時重試機制在再次選擇佇列時lastBrokerName不為空,回到最開始的那個方法,還是利用sendWhichQueue自增獲取值,但這裡多了一個步驟,與訊息佇列的長度進行取模運算後,如果此時選擇的佇列所在的broker還是上一次選擇失敗的broker,則繼續選擇下一個佇列。
我們再細想一下,如果此時有broker宕機了,在預設機制下很可能下一次選擇的佇列還是在已經宕機的broker,沒有辦法規避故障的broker,因此訊息傳送很可能會再次失敗,重試傳送造成了不必要的效能損失。
所以rocketmq採用Broker故障延遲機制來規避故障的broker。
Broker故障延遲機制
sendLatencyFaultEnable=true,訊息傳送選擇佇列呼叫以下方法:
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // Broker故障延遲機制 if (this.sendLatencyFaultEnable) { try { // 自增取值 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 佇列位置值取模 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 校驗佇列是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } // 嘗試從失敗的broker列表中選擇一個可用的broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { // 從失敗條目中移除已經恢復的broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 預設機制 return tpInfo.selectOneMessageQueue(lastBrokerName); }
該方法利用sendWhichQueue的自增取值的方式輪詢選擇佇列,與預設機制一致,不同的是多了判斷是否可用,呼叫了latencyFaultTolerance.isAvailable(mq.getBrokerName())判斷,其中肯定內涵機關,所以我們需要從延遲機制的幾個核心類找突破口。
下面我會從原始碼的角度詳細地分析rocketmq是如何實現在一定時間內規避故障broker的。
從傳送訊息方法原始碼看出,在傳送完訊息,會呼叫updateFaultItem方法:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:
// 3.執行真正的訊息傳送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
傳送訊息時捕捉到異常同樣會呼叫updateFaultItem方法,它是延遲機制的核心方法:
endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
其中endTimestamp - beginTimestampPrev等於訊息傳送需要用到的時間,如果成功傳送第三個引數傳的是false,傳送失敗傳true,下面繼續看updateFaultItem方法的實現原始碼:
org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
其中引數currentLatency為本次訊息傳送的延遲時間,isolation表示broker是否需要規避,所以訊息成功傳送表示broker無需規避,訊息傳送失敗時表示broker發生故障了需要規避。
latencyMax和notAvailableDuration是延遲機制演算法的核心值,每次傳送訊息的延遲,它們也決定了失敗條目中的startTimestamp的值。
從方法可看出,如果broker需要隔離,訊息傳送延遲時間預設為30s,再利用這個時間從latencyMax尾部向前找到比currentLatency小的陣列下標index,如果沒有找到就返回0,我們看看latencyMax和notAvailableDuration這兩個陣列的預設值:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
可看出,如果isolation=true,該broker會得到一個10分鐘規避時長,如果isolation=false,那麼規避時長就得看訊息傳送的延遲時間是多少了,我們繼續往下擼:
org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem:
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // 從快取中獲取失敗條目 FaultItem old = this.faultItemTable.get(name); if (null == old) { // 如果快取不存在,新建失敗條目 final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); // broker開始可用時間=當前時間+規避時長 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { // 更新舊的失敗條目 old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { // 更新舊的失敗條目 old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
FaultItem為儲存故障broker的類,稱為失敗條目,每個條目儲存了broker的名稱、訊息傳送延遲時長、故障規避開始時間。
該方法主要是對失敗條目的一些更新操作,如果失敗條目已存在,那麼更新失敗條目,如果失敗條目不存在,那麼新建失敗條目,其中失敗條目的startTimestamp為當前系統時間加上規避時長,startTimestamp是判斷broker是否可用的時間值:
org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable:
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
寫在最後
經過一波原始碼的分析,我相信你已經找到了你想要的答案,這個故障延遲機制真的是一個很好的設計,我們看原始碼不僅僅要看爽,爽了之後我們還要思考一下這些優秀的設計思想在平時寫程式碼的過程中是否可以借鑑一下?