訊息中介軟體客戶端消費控制實踐
本文來自網易雲社群,轉載務必請註明出處。
背景
訊息中介軟體是用來系統間通訊、非同步解耦、削峰填谷的重要手段,個人認為一個比較靠譜的Mq,應該具備以下特點
-
控制投遞:訊息消費失敗,支援訊息有節奏的重新投遞
-
延遲消費:支援訊息延遲消費,用來解決諸如訊息亂序的場景
-
流控消費:消費支援流控,真正的支援削峰填谷
-
消費監控:訊息消費的監控
目前考拉常用的訊息中介軟體有rabbitMq和kafka,各自都有一些問題,不能完美勝任以上功能,因此本文試著探索並實踐了一個訊息控制的框架。
知己知彼
RabbitMq
先拿rabbitMq來說點事兒,這個mq的問題比較大,但是目前交易核心的訊息都在rabbitMq上面。
對比原始需求
-
控制投遞:訊息消費失敗後,訊息會被服務端無節制投遞,一但處理邏輯有bug,觸發無限投遞,瞬間伺服器會被訊息打爆。
-
延遲消費:亂序問題是mq共用的問題,rabbitMq並沒有提供解決方案
-
流控消費:訊息物件儲存在mq例項記憶體中,因此rabbitMq本身不支援堆積太多訊息。
-
消費監控:控制檯資料簡陋
Kafka
然後拿kafka做下對比,kafka效能、擴充套件性都優於rabbitMq,不過也不能完全滿足我們的需求
對比原始需求
-
控制投遞:kafka消費有offset的概念,通過消費者程式碼實現可以主動控制消費節奏。
-
延遲消費:kafka也沒有提供結局方案
-
流控消費:kafka訊息資料落在磁碟上,可以堆積比較多的訊息,但是對於消費方怎麼流控並沒有提供方案。
-
消費監控:資料也比較簡陋
設計思路
最理想情況,這些功能可以直接做在服務端上面,客戶端不用做太多改造。不過,考慮現實情況,沒辦法直接去改kafka和rabbitMq原始碼,只能退而求其次去改造訊息客戶端,在訊息客戶端和消費者之間增加一個訊息控制框架。
訊息控制框架主要結構如下:
另外針對持久化到客戶端的資料,還結合k-scheduler提供了一個訊息重推模組,如下:
下面針對原始需求,看看訊息控制框架都做了什麼事情:
-
控制投遞:訊息控制層catch消費異常,rabbitMq的訊息會直接持久化訊息後續重推,kafka訊息異常,重置offset,有節制的重推。
-
延遲消費:訊息適配層提供延遲推送介面,需要業務方識別出訊息亂序後,呼叫介面,介面會直接持久化亂序訊息,在指定延遲時間後重新推送。
-
流控消費:對於rabbitMq訊息,控制層提供單機流控介面,被流控的訊息直接持久化到DB,等待後續重推。另外針對kafka訊息,集成了nfc全域性流控,框架識別流控錯誤碼,有節制的重推訊息。
-
消費監控:對接哨兵監控,所有訊息消費、失敗、流控等資料都會採集到哨兵
詳細實現
RabbitMq訊息的詳細實現
rabbitMq和kafka都有各自特點,因此雖然整體框架的思路是一致的,但是一些細節處理還是略有不同,此處先拿rabbitMq的實現來作分析:
先上圖
如圖展示了一條rabbitMq訊息是如何經過訊息控制框架的,異常訊息、延遲推送以及被流控的訊息都會落庫,然後等待重推。
被持久化的訊息主要包含以下欄位
-
應用名
-
業務名
-
協議名(kafka或者rabbitMq)
-
環境名(預發或者線上或者beta)
-
訊息體
-
訊息重推時間
-
當前重試次數(根據重試次數實現了一個退避演算法,來計算下次重推時間)
-
訊息狀態
其中,為了表明一個訊息和消費者之間的歸屬關係,提出了一個 消費者分組 的概念。
一個消費者分組包含應用名、業務名、協議名以及環境名,可以對應到唯一的消費者 。
重推邏輯如下
重推任務依賴於外部驅動,可以是cron可以是k-schedule,動動手指配置一下就ok。
目前重推任務只支援單機重試,因此大批量的訊息重推消費速度不能得到保證。
kafka訊息的差異實現
kafka本身可以堆積訊息,因此摒棄了流控落庫的邏輯,直接重置offset,有節奏的重試。另外,集成了nfc的全域性流控,kafka的消費者直接使用nfc全域性流控。
此外,對於kafka異常訊息的處理,框架也是利用offset來重試,沒有落庫。
監控示例
核心程式碼實現
核心類圖
針對交易訊息做了定製化處理,對於kafka交易訊息對接方只需要繼承實現AbstractTradeKafkaControlProcessor,對於rabbitMq型別交易訊息繼承實現AbstractTradeRabbitControlListener即可。
AbstractControlListener中核心的訊息控制程式碼如下,AbstractTradeKafkaControlProcessor中有針對kafka的特點做改動,不再贅述。
/** * 訊息處理流程 * * @param message * @param controlDTO */ protected void processControlMessage(T message, ControlDTO controlDTO) { BizIdTypeBond bizIdTypeBond = buildBizIdTypeBond(message); MonitorNameSpace monitorNameSpace = buildMonitorNameSpace(bizIdTypeBond);// 統計訊息處理個數 MonitorFactory.getMonitorService().onNewMessage(monitorNameSpace, 1, false);// 是否流控 boolean needRelease = false;if (isOpenFlowControl()) { flowControlService.aquireResource(); needRelease = true; }// 執行業務邏輯 try { onControlMessage(message, controlDTO);if (controlDTO.getDelayPush() != null) {// 延遲推送 storeService.storeMessage(encodeStoreMessage(message, bizIdTypeBond), controlDTO.getDelayPush(),"delay push"); MonitorFactory.getMonitorService().onStoreMessage4DelayPush(monitorNameSpace, 1, false); NotifyConstants.NOTIFY_LOG.warn("delay push messageDTO=", message); } } catch (Throwable t) {// 異常控制 String note = NotifyCommonUtil.buildCallStatck(t, 500); storeService.storeMessage(encodeStoreMessage(message, bizIdTypeBond), null, note); MonitorFactory.getMonitorService().onStoreMessage4Exception(monitorNameSpace, 1, false, note); NotifyConstants.NOTIFY_LOG.warn("process failed messageDTO=", message);return; } finally {if (needRelease) { flowControlService.releaseResoure(); } } }複製程式碼
對接示例
xml配置
<bean id="globalControlConfig" > <property name="applicationName" value="order"/> <property name="enviroment" value="${message.control.environment}"/> <property name="tableName" value="tb_mq_message_control"/> <property name="dataSource" ref="rdsDataSource"/> </bean> <!--交易事件變更監聽器--> <bean id="tradeEventListener" > <property name="notifyControlConfig" ref="notifyControlConfigTrade"/> </bean> <bean id="notifyControlConfigTrade" > <property name="bizGroup" value="trade"/> </bean> <!-- 交易事件兜底重試任務 -->複製程式碼
<bean id="retryTaskEntry" class="com.netease.haitao.notify.base.task.runner.RetryTaskEntry"/>複製程式碼程式碼示例
rabbitMq
public class TradeEventListener extends AbstractTradeRabbitControlListener { @Resource private OrderComposeConfigHolder orderComposeConfigHolder; @Resource private TradeEventService tradeEventService; @Override protected boolean isOpenMessageControl() { return orderComposeConfigHolder.isOpenTradeMessageControl(); } @Override protected int flowControlThreshold() { return orderComposeConfigHolder.tradeEventFlowControlThreshold(); } @Override protected boolean isOpenFlowControl() { return orderComposeConfigHolder.isOpenFlowControl(); } @Override public void onControlTradeEvent(TradeEvent tradeEvent, ControlDTO controlDTO) throws Exception { OrderComposeLogConstants.notifyLog.info("onTradeEvent,message=" + tradeEvent.toString()); try { tradeEventService.processTradeEvent(tradeEvent); } catch (OrderComposeException e) { if (e.getErrorCode().equals(OrderComposeErrorEnum.TRADE_EVENT_WRONG_ORDER.intValue())) { OrderComposeLogConstants.notifyLog.warn("message wrong order,tradeEvent=" + tradeEvent.toString() + ",delayPush=" + orderComposeConfigHolder.tradeEventWrongOrderDelayPushTime()); // 亂序之後的延遲消費 controlDTO.setDelayPush(orderComposeConfigHolder.tradeEventWrongOrderDelayPushTime()); } else { OrderComposeLogConstants.notifyLog.warn("process failed,tradeEvent=" + tradeEvent.toString(), e); throw e; } } catch (Exception e) { OrderComposeLogConstants.notifyLog.warn("process failed,tradeEvent=" + tradeEvent.toString(), e); throw e; } } }複製程式碼
kafka
/** * 訂單建立訂單佔用庫存訊息處理 */ public class OrderInvUnpayCloseEventProcessor extends AbstractTradeKafkaControlProcessor<UnpayCancelEvent> { @Resource private OrderInvModule orderInvModule; @Override @GlobalResource(resourceName = NfcResources.orderInvNotify) public void onControlMessage(UnpayCancelEvent unpayCancelEvent, ControlDTO controlDTO) throws Exception { OrderComposeLogConstants.notifyLog.info("OrderInvCreateEventProcessor,unpayCancelEvent=" + unpayCancelEvent); orderInvModule.processUnpayClose(unpayCancelEvent.getGorderId()); } }複製程式碼
複製程式碼
本文來自網易雲社群 ,經作者程漢授權釋出。
網易雲免費體驗館,0成本體驗20+款雲產品!
更多網易研發、產品、運營經驗分享請訪問網易雲社群。