RabbitMQ(三):訊息持久化策略
一、前言
在正常的伺服器執行過程中,時常會面臨伺服器宕機重啟的情況,那麼我們的訊息此時會如何呢?很不幸的事情就是,我們的訊息可能會消失,這肯定不是我們希望見到的結果。所以我們希望AMQP伺服器崩潰了也可以將訊息恢復,這稱之為訊息持久化。RabbitMQ自然存在這種策略可以幫助我們完成這件事情。
二、持久化的訊息
當RabbitMQ伺服器重啟後,原先的佇列和交換器會隨同裡面的訊息一同消失。原因在於每個佇列和交換器都有durable屬性,該屬性預設是false,它決定了RabbitMQ是否需要在崩潰或者重啟之後重新建立佇列或者交換器。將它設定為true就代表了永續性,在伺服器重啟之後就會重新持久的建立佇列和交換器。
當然做到這點還不夠,我們需要的是持久化的訊息,所以在訊息釋出前,通過將訊息的“投遞模式”(delivery mode)屬性設定為2將訊息標記為持久化。到目前為止,訊息還只是被表示為持久化,還需要被髮布到持久化的交換器中併到達持久化的佇列中才行。如果不是這樣,包含持久化訊息的佇列或者交換器揮著Rabbit崩潰重啟後不復存在,導致訊息成為一個孤兒。因此,總結起來需要做到以下三點:
(1)將訊息的投遞模式選項設定為2(持久);
(2)將訊息傳送到持久化的交換器;
(3)訊息到達持久化的佇列。
注意,如果原先有非持久的交換器或者佇列,需要刪除後才可重新建立,否則就建立其他名稱的交換器或者佇列,程式碼如下:
//宣告持久交換器 channel.ExchangeDeclare( "HelloExchange",//交換器名稱 ExchangeType.Direct,//交換器型別 true,//是否持久話 false,//是否自動刪除 null//關於交換器的詳細設定,鍵值對形式 ); //宣告持久佇列 channel.QueueDeclare( "HelloQueue",//佇列名稱 true,//是否持久化 false,//是否只對首次宣告的佇列可見 false,//是否自動刪除 null////關於佇列和佇列內訊息的詳細設定,鍵值對形式 ); //釋出持久訊息 string msg_str = "這是生產者第一次釋出的訊息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//釋出的資料型別 msg_pro.DeliveryMode = 2;//標記持久化
三、事務
目前為止,我們已經將訊息、佇列和交換器設定為持久化。但是事實上還存在著'最後一英里'的距離,就是在把訊息寫入磁碟前,訊息由於伺服器宕機而消失該如何?這時候就需要使用到事務,說到事務就會想到SQL中的事務,但是不能搞混了。AMQP中,在把通道設定為事務模式後,通過通道傳送訊息後還有多個其他的AMQP命令,這些命令是執行還是忽略,取決於訊息的傳送是否成功,訊息傳送成功通道會在事務中完成其他AMQP命令,就可以提交事務了,傳送失敗則其他AMQP命令將不會執行,我們也會知道傳送失敗,而採取相應的措施。事務保證瞭解決這最後的問題。
程式碼如下:
using (IConnection conn = conn_factory.CreateConnection()) { //2.建立通道 using (IModel channel = conn.CreateModel()) { try { channel.TxSelect();//宣告事務 //3.釋出訊息 string msg_str = "這是生產者釋出的訊息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//釋出的資料型別 msg_pro.DeliveryMode = 2; channel.BasicPublish( "HelloExchange",//訊息傳送目標交換器名稱 "hola",//路由鍵 msg_pro,//訊息的釋出屬性 Encoding.UTF8.GetBytes(msg_str)//訊息 ); channel.TxCommit();//提交事務 } catch(Exception ex) { channel.TxRollback();//回滾事務 } } }
四、傳送方確認模式
雖然通過事務和持久化的訊息、佇列和交換器可以確保訊息不會丟失,但是對訊息的吞吐量有著非常嚴重的影響,而且使用訊息通訊就是為了避免同步,可是事務卻會導致生產者程式產生同步。所以,有一個更好的方法保證訊息投遞:傳送方確認模式。和事務類似,我們需要將通道channel設定為confirm模式,而且只能通過重新建立通道來關閉該設定。一旦通道進入confirm模式,所有的通道上釋出的訊息都會被指派一個唯一的ID。當訊息被投遞到佇列後,通道就會發送一個傳送方確認模式給生產者程式,使得生產者知道訊息安全到達隊列了。
傳送發確認模式最大的好處是它們是非同步的,沒有回滾的概念,更加輕量級,對效能的影響也幾乎忽略不計。
程式碼如下:
channel.ConfirmSelect();//開啟發送確認模式 //3.釋出訊息 IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//釋出的資料型別 msg_pro.DeliveryMode = 2; for(int i = 0; i < 5; i++) { string msg_str = string.Format("這是生產者釋出的訊息{0}", i); channel.BasicPublish( "HelloExchange",//訊息傳送目標交換器名稱 "hola",//路由鍵 msg_pro,//訊息的釋出屬性 Encoding.UTF8.GetBytes(msg_str)//訊息 ); if (channel.WaitForConfirms()) Console.WriteLine(i); else Console.WriteLine("傳送失敗"); }
可以看到channel.WaitForConfirms()方法是同步的,這樣的話效率會第一點,我們可以傳送完所有的訊息,然後用channel.WaitForConfirmsOrDie()一次性提交,如果中途有一個訊息提交失敗或者超時,就會報錯Exception,需要全部重新提交。
五、小結
訊息持久化的策略大致就是以上幾種,我們可以根據自己的實際需求來選擇相應的策略。如果有問題歡迎指出!