分散式系統訊息中介軟體——RabbitMQ的使用進階篇
分散式系統訊息中介軟體——RabbitMQ的使用進階篇
前言
上一篇文章 ( ofollow,noindex" target="_blank">https://www.cnblogs.com/hunternet/p/9668851.html ) 簡單總結了分散式系統中的訊息中介軟體以及RabbitMQ的基本使用,這篇文章主要總結一下RabbitMQ在日常專案開發中比較常用的幾個特性。
一 mandatory 引數
上一篇文章中我們知道,生產者將訊息傳送到RabbitMQ的交換器中通過RoutingKey與BindingKey的匹配將之路由到具體的佇列中以供消費者消費。那麼當我們通過匹配規則找不到佇列的時候,訊息將何去何從呢?Rabbit給我們提供了兩種方式。mandatory與備份交換器。
mandatory引數是channel.BasicPublish方法中的引數。其主要功能是訊息傳遞過程中不可達目的地時將訊息返回給生產者。當mandatory 引數設為true 時,交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,那麼RabbitMQ 會呼叫BasicReturn 命令將訊息返回給生產者。當mandatory 引數設定為false 時。則訊息直接被丟棄。其運轉流程與實現程式碼如下(以C# RabbitMQ.Client 3.6.9為例):
//連線與建立通道--後續的示例程式碼我們會省略掉這部分程式碼和釋放連線 ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection();//連線Rabbit IModel channel = conn.CreateModel();//建立通道 channel.ExchangeDeclare("exchangeName", "direct", true);//定義交換器 String queueName = channel.QueueDeclare("TestQueue", true, false, false, null).QueueName;//定義佇列 佇列名TestQueue,持久化的,非排它的,非自動刪除的。 channel.QueueBind(queueName, "exchangeName", "routingKey");//佇列繫結交換器 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchangeName", "routingKey", true, null, message);//釋出一個可以路由到佇列的訊息,mandatory引數設定為true var message1 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("exchangeName", "routingKey1", true, null, message);//釋出一個不可以路由到佇列的訊息,mandatory引數設定為true //生產者回調函式 channel.BasicReturn += (model, ea) => { //do something... 訊息若不能路由到佇列則會呼叫此回撥函式。 }; //關閉通道與連線 channel.close(); conn.close() ;
二 備份交換器
當訊息不能路由到佇列時,通過mandatory設定引數,我們可以將訊息返回給生產者處理。但這樣會有一個問題,就是生產者需要開一個回撥的函式來處理不能路由到的訊息,這無疑會增加生產者的處理邏輯。備份交換器(Altemate Exchange)則提供了另一種方式來處理不能路由的訊息。備份交換器可以將未被路由的訊息儲存在RabbitMQ中,在需要的時候去處理這些訊息。其主要實現程式碼如下:
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("alternate-exchange", "altExchange"); channel.ExchangeDeclare("normalExchange", "direct", true, false, args);//定義普通交換器並新增備份交換器引數 channel.ExchangeDeclare("altExchange", "fanout", true, false, null);//定義備份交換器,並宣告為扇形交換器 channel.QueueDeclare("normalQueue", true, false, false, null);//定義普通佇列 channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey1");//普通佇列佇列繫結普通交換器 channel.QueueDeclare("altQueue", true, false, false, null);//定義備份佇列 channel.QueueBind("altQueue", "altExchange", "");//繫結備份佇列與交換器 var msg1 = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey1", false, null, msg1);//釋出一個可以路由到佇列的訊息,訊息最終會路由到normalQueue var msg2 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("normalExchange", "NormalRoutingKey2", false, null, msg2);//釋出一個不可以被路由的訊息,訊息最終會進入altQueue
備份交換器其實和普通的交換器沒有太大的區別,為了方便使用,建議設定為fanout型別,若設定為direct 或者topic的型別。需要注意的是,訊息被重新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是一樣的。考慮這樣一種情況,如果備份交換器的型別是direct,並且有一個與其繫結的佇列,假設繫結的路由鍵是key1,當某條攜帶路由鍵為key2 的訊息被轉發到這個備份交換器的時候,備份交換器沒有匹配到合適的佇列,則訊息丟失。如果訊息攜帶的路由鍵為keyl,則可以儲存到佇列中。
對於備份交換器,有以下幾種特殊情況:
- 如果設定的備份交換器不存在,客戶端和RabbitMQ 服務端都不會有異常出現,此時訊息會丟失。
- 如果備份交換器沒有繫結任何佇列,客戶端和RabbitMQ 服務端都不會有異常出現,此時訊息會丟失。
- 如果備份交換器沒有任何匹配的佇列,客戶端和RabbitMQ 服務端都不會有異常出現,此時訊息會丟失。
- 如果備份交換器和mandatory引數一起使用,那麼mandatory引數無效。
三 過期時間(TTL)
3.1 設定訊息的TTL
目前有兩種方法可以設定訊息的TTL。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息本身進行單獨設定,每條訊息的TTL可以不同。如果兩種方法一起使用,則訊息的TTL 以兩者之間較小的那個數值為準。訊息在佇列中的生存時間一旦超過設定的TTL值時,就會變成"死信" (Dead Message) ,消費者將無法再收到該訊息。(有關死信佇列請往下看)
通過佇列屬性設定訊息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl引數實現的,這個引數的單位是毫秒。示例程式碼下:
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-message-ttl", 6000); channel.QueueDeclare("ttlQueue", true, false, false, args);
如果不設定TTL.則表示此訊息不會過期;如果將TTL設定為0 ,則表示除非此時可以直接將訊息投遞到消費者,否則該訊息會被立即丟棄(或由死信佇列來處理)。
針對每條訊息設定TTL的方法是在channel.BasicPublish方法中加入Expiration的屬性引數,單位為毫秒。關鍵程式碼如下:
BasicProperties properties = new BasicProperties() { Expiration = "20000",//設定TTL為20000毫秒 }; var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, properties, message);
注意:對於第一種設定佇列TTL屬性的方法,一旦訊息過期,就會從佇列中抹去,而在第二種方法中,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期是在即將投遞到消費者之前判定的。Why?在第一種方法裡,佇列中己過期的訊息肯定在佇列頭部, RabbitMQ 只要定期從隊頭開始掃描是否有過期的訊息即可。而第二種方法裡,每條訊息的過期時間不同,如果要刪除所有過期訊息勢必要掃描整個佇列,所以不如等到此訊息即將被消費時再判定是否過期,如果過期再進行刪除即可。
3.2 設定佇列的TTL
注意,這裡和上述通過佇列設定訊息的TTL不同。上面刪除的是訊息,而這裡刪除的是佇列。通過channel.QueueDeclare 方法中的x-expires引數可以控制佇列被自動刪除前處於未使用狀態的時間。這個未使用的意思是佇列上沒有任何的消費者,佇列也沒有被重新宣告,並且在過期時間段內也未呼叫過channel.BasicGet命令。
設定佇列裡的TTL可以應用於類似RPC方式的回覆佇列,在RPC中,許多佇列會被創建出來,但是卻是未被使用的(有關RabbitMQ實現RPC請往下看)。RabbitMQ會確保在過期時間到達後將佇列刪除,但是不保障刪除的動作有多及時。在RabbitMQ 重啟後, 持久化的佇列的過期時間會被重新計算。用於表示過期時間的x-expires引數以毫秒為單位, 井且服從和x-message-ttl一樣的約束條件,不同的是它不能設定為0(會報錯)。
示例程式碼如下:
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-expires", 6000); channel.QueueDeclare("ttlQueue", false, false, false, args);
四 死信佇列
DLX(Dead-Letter-Exchange)死信交換器,當訊息在一個佇列中變成死信之後,它能被重新被髮送到另一個交換器中,這個交換器就是DLX ,繫結DLX的佇列就稱之為死信佇列。
訊息變成死信主要有以下幾種情況:
- 訊息被拒絕(BasicReject/BasicNack) ,井且設定requeue 引數為false;(消費者確認機制將會在下一篇文章中涉及)
- 訊息過期;
- 佇列達到最大長度。
DLX也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中存在死信時,RabbitMQ 就會自動地將這個訊息重新發布到設定的DLX上去,進而被路由到另一個佇列,即死信佇列。可以監聽這個佇列中的訊息、以進行相應的處理。
通過在channel.QueueDeclare 方法中設定x-dead-letter-exchange引數來為這個佇列新增DLX。其示例程式碼如下:
channel.ExchangeDeclare("exchange.dlx", "direct", true);//定義死信交換器 channel.ExchangeDeclare("exchange.normal", "direct", true);//定義普通交換器 IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-message-ttl",10000);//定義訊息過期時間為10000毫秒 args.Add("x-dead-letter-exchange", "exchange.dlx");//定義exchange.dlx為死信交換器 args.Add("x-dead-letter-routing-key", "routingkey");//定義死信交換器的繫結key,這裡也可以不指定,則預設使用原佇列的路由key channel.QueueDeclare("queue.normal", true, false, false, args);//定義普通佇列 channel.QueueBind("queue.normal", "exchange.normal", "normalKey");//普通佇列交換器繫結 channel.QueueDeclare("queue.dlx", true, false, false, null);//定義死信佇列 channel.QueueBind("queue.dlx", "exchange.dlx", "routingkey");//死信佇列交換器繫結,若上方為制定死信佇列路由key則這裡需要使用原佇列的路由key //釋出訊息 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchange.normal", "normalKey", null, message) ;
以下為死信佇列的運轉流程:
五 延遲佇列
RabbitMQ本身並未提供延遲佇列的功能。延遲佇列是一個邏輯上的概念,可以通過過期時間+死信佇列來模擬它的實現。延遲佇列的邏輯架構大致如下:
生產者將訊息傳送到過期時間為n的佇列中,這個佇列並未有消費者來消費訊息,當過期時間到達時,訊息會通過死信交換器被轉發到死信佇列中。而消費者從死信佇列中消費訊息。這個時候就達到了生產者釋出了訊息在講過了n時間後消費者消費了訊息,起到了延遲消費的作用。
延遲佇列在我們的專案中可以應用於很多場景,如:下單後兩個訊息取消訂單,七天自動收貨,七天自動好評,密碼凍結後24小時解凍,以及在分散式系統中訊息補償機制(1s後補償,10s後補償,5m後補償......)。
六 優先順序佇列
就像我們生活中的“特殊”人士一樣,我們的業務上也存在一些“特殊”訊息,可能需要優先進行處理,在生活上我們可能會對這部分特殊人士開闢一套VIP通道,而Rabbit同樣也有這樣的VIP通道(前提是在3.5的版本以後),即優先順序佇列,佇列中的訊息會有優先順序優先順序高的訊息具備優先被消費的特權。針對這些VIP訊息,我們只需做兩件事:
我們只需做兩件事情:
- 將佇列宣告為優先順序佇列,即在建立佇列的時候新增引數 x-max-priority 以指定最大的優先順序,值為0-255(整數)。
- 為優先順序訊息新增優先順序。
其示例程式碼如下:
channel.ExchangeDeclare("exchange.priority", "direct", true);//定義交換器 IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-max-priority", 10);//定義優先順序佇列的最大優先順序為10 channel.QueueDeclare("queue.priority", true, false, false, args);//定義優先順序佇列 channel.QueueBind("queue.priority", "exchange.priority", "priorityKey");//佇列交換器繫結 BasicProperties properties = new BasicProperties() { Priority =8,//設定訊息優先順序為8 }; var message = Encoding.UTF8.GetBytes("TestMsg8"); //釋出訊息 channel.BasicPublish("exchange.priority", "priorityKey", properties, message);
注意:沒有指定優先順序的訊息會將優先順序以0對待。 對於超過優先順序佇列所定最大優先順序的訊息,優先順序以最大優先順序對待。對於相同優先順序的訊息,後進的排在前面。如果在消費者的消費速度大於生產者的速度且Broker 中沒有訊息堆積的情況下, 對傳送的訊息設定優先順序也就沒有什麼實際意義。因為生產者剛傳送完一條訊息就被消費者消費了,那麼就相當於Broker 中至多隻有一條訊息,對於單條訊息來說優先順序是沒有什麼意義的。
關於優先順序佇列,好像違背了佇列這種資料結構先進先出的原則,其具體是怎麼實現的在這裡就不過多討論。有興趣的可以自己研究研究。後續可能也會有相關的文章來分析其原理。
七 RPC 實現
RPC,是Remote Procedure Call 的簡稱,即遠端過程呼叫。它是一種通過網路從遠端計算機上請求服務,而不需要了解底層網路的技術。RPC 的主要功用是讓構建分散式計算更容易,在提供強大的遠端呼叫能力時不損失本地呼叫的語義簡潔性。
有關RPC不多介紹,這裡我們主要介紹RabbitMQ如何實現RPC。RabbitMQ 可以實現很簡單的RPC。客戶端傳送請求訊息,服務端回覆響應的訊息,為了接收響應的訊息,我們需要在請求訊息中傳送一個回撥佇列(可以使用預設的佇列)。其伺服器端實現程式碼如下:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new MySimpRpcServer(new Subscription(channel, "RpcQueue")); rpc.MainLoop(); }
public class MySimpRpcServer: SimpleRpcServer { public MySimpRpcServer(Subscription subscription) : base(subscription) { } /// <summary> /// 執行完成後進行回撥 /// </summary> public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes("我收到了!"); } /// <summary> /// 進行處理 /// </summary> /// <param name="evt"></param> public override void ProcessRequest(BasicDeliverEventArgs evt) { // todo..... base.ProcessRequest(evt); } }
客戶端實現程式碼如下:
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); SimpleRpcClient client = new SimpleRpcClient(channel, "RpcQueue"); var message = Encoding.UTF8.GetBytes("TestMsg8"); var result = client.Call(message); //do somethings...
以上是Rabbit客戶端自己幫我們封裝好的Rpc客戶端與服務端的邏輯。當然我們也可以自己實現,主要是藉助於BasicProperties的兩個引數。
- ReplyTo: 通常用來設定一個回撥佇列。
- CorrelationId : 用來關聯請求(request) 和其呼叫RPC 之後的回覆(response) 。
其處理流程如下:
- 當客戶端啟動時,建立一個匿名的回撥佇列。
- 客戶端為RPC 請求設定2個屬性: ReplyTo用來告知RPC 服務端回覆請求時的目的佇列,即回撥佇列; Correlationld 用來標記一個請求。
- 請求被髮送到RpcQueue佇列中。
- RPC 服務端監聽RpcQueue佇列中的請求,當請求到來時,服務端會處理並且把帶有結果的訊息傳送給客戶端。接收的佇列就是ReplyTo設定的回撥佇列。
- 客戶端監昕回撥佇列,當有訊息時,檢查Correlationld 屬性,如果與請求匹配,那就是結果了。
結束語
本篇文章簡單介紹了RabbitMQ在我們專案開發中常用的幾種特性。這些特性可以幫助我們更好的將Rabbit用於我們不同的業務場景中。這些特性與示例,可以自己在程式中執行一下,然後通過檢視Rabbit提供的web管理介面來驗證其正確性(關於web管理介面不多介紹,相信大家稍微研究研究就能明白)。當然,關於Rabbit的使用,仍有許多地方在本文中沒有提及,如:RabbitMQ的特色——確認機制、持久化......將在下一篇文章中再詳細介紹。