RabbitMQ快速入門
一、前言
RabbitMQ其實是我最早接觸的一個MQ框架,我記得當時是在大學的時候跑到圖書館一個人去看,由於RabbitMQ官網的英文還不算太難,因此也是參考官網學習的,一共有6章,當時是用Node來開發的,當時花了一下午看完了,也理解了。而現在回過頭來再看,發現已經忘記了個差不多了,現在再回過頭來繼續看看,然乎記之。以防再忘,讀者看時最好有一定的MQ基礎。
二、RabbitMQ
首先我們需要知道的是RabbitMQ它是基於高階佇列協議(AMQP)的,它是Elang編寫的,下面將圍繞RabbitMQ佇列、交換機、RPC三個重點進行展開。
2.1、佇列
儲存訊息的地方,多個生產者可以將訊息傳送到一個佇列,多個消費者也可以消費同一個佇列的訊息。
注意:當多個消費者監聽一個佇列,此時生產者傳送訊息到佇列只有一個消費者被消費,並且消費端的消費方式是按照消費端在內部啟動的順序輪詢(round-robin)。
2.2、消費者
消費訊息的一方
public class Send { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }catch (Exception e){ e.printStackTrace(); } } }
public class Recv { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }catch (Exception e){ e.printStackTrace(); } } }
2.3、小結
1、Rabbit是如何保證訊息被消費的?
答:通過ack機制。每當一個訊息被消費端消費的時候,消費端可以傳送一個ack給RabbitMQ,這樣RabbitMQ就知道了該條訊息已經被完整消費並且可以被delete了。;如果一條訊息被消費但是沒有傳送ack,那麼此時RabbitMQ將會認為需要重新消費該訊息,如果此時還有其它的消費者,那麼此時RabbitMQ將會把這條訊息交給它處理。
注意:開啟ack機制的是autoAck=
false
;
2、訊息如何進行持久化?
- 將queue持久化,即設定 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二個引數durable為true
- 設定訊息持久化,即設定MessageProperties.PERSISTENT_TEXT_PLAIN
注意:訊息持久化並不一定保證訊息不會被丟失
3、RabbitMQ如何避免兩個消費者一個非常忙一個非常閒的情況?
通過如下設定,保證一個消費者一次只能消費一個訊息,只有當它消費完成並且返回ack給RabbitMQ之後才給它派發新的訊息。
int prefetchCount = 1 ; channel.basicQos(prefetchCount)
4、RabbitMQ異常情況下如何保證訊息不會被重複消費?
需要業務自身實現密等性,RabbitMQ沒有提供比較好的方式去保證。
2.2、交換機
在RabbitMQ中,生產者其實從來不會發送訊息到佇列,甚至,它不知道訊息被髮送到了哪個佇列。那它被髮送到了哪裡呢?就是本節的重點:交換機,下面就是它在RabbitMQ中的介紹圖。(X就是交換機)生產者傳送訊息給交換機,然後由交換機將訊息轉發給佇列。
從上圖就產生一個問題:X怎麼將訊息發給queue呢?它是把訊息發給所有queue還是發給一個指定的queue或者丟棄訊息呢?這就是看交換機的型別了。下面一起談談這幾種型別
2.2.1、fanout
fanout:廣播模式,這個比較好理解,就是所有的佇列都能收到交換機的訊息。
如上面,兩個佇列都能收到交換機的訊息。
2.2.2、direct
這個模式相當於釋出/訂閱模式的一種,當交換機型別為direct的時候,此時我們需要設定兩個引數:
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二個引數,我們可以把它稱呼為routeKey
- channel.queueBind(queueName, EXCHANGE_NAME, "");第三個引數,我們把它稱呼為bindKey
有了這兩個引數,我們就可以指定我們訂閱哪些訊息了。
如圖,Q1訂閱了orange的訊息,Q2訂閱了black、green的訊息。
2.2.3、topic
其實topic和direct有一點類似,它相當於對direct作了增強。在direct中,我們上面所說的bind routeKey為black、green的它是有限制的,它只能絕對的等於routeKey,但是有時候我們的需求不是這樣,我們可能想要的是正則匹配即可,那麼Topic就派上用場了。
當型別為topic時,它的bindKey對應字串需要是以“.”分割,同時RabbitMQ還提供了兩個符號:
- 星號(*):表示1個單詞
- 井號(#):表示0、多個單詞
上圖的意思是:所有第二個單詞為orange的訊息傳送個Q1,所有最後一個單詞為rabbit或者第一個單詞為lazy的訊息傳送給Q2。
2.2.4、header
這一種型別官方demo沒有過多解釋,這裡也不研究了。
2.3、RPC
RabbitMQ 還可以實現RPC(遠端過程呼叫)。什麼是RPC,簡單來說就是local呼叫remote方法。對應於RabbitMQ中則是Client傳送一個request message,Server處理完成之後將其返回給Client。這裡就有了一個疑問?Server是如何將response返回給Client的,這裡RabbitMQ定義了一個概念:Callback Queue。
Callback Queue
注意這個佇列是獨一無二的String replyQueueName = channel.queueDeclare().getQueue();
。
首先我們需要明白一點的是為什麼需要這個queue?我們知道在RabbitMQ作訊息佇列的時候,Client只需要將訊息投放到queue中,然後Server從queue去取就可以了。但是在RabbitMQ作為RPC的時候多了一點就是,Client還需要返回結果,這時Server端怎麼知道把訊息傳送給Client,這就是Callback Queue的用處了。
Correlation Id
在上面我們知道Server返回資料給Client是通過Callback Queue的,那麼是為每一個request都建立一個queue嗎?這未免太過浪費資源,RabbitMQ有更好的方案。在我們傳送request,繫結一個唯一ID(correlationId),然後在訊息被處理返回的時候取出這個ID和發出去的ID進行匹配。這樣來說一個Callback Queue是Client級別而不是request級別的了。
實現
上面介紹了RabbitMQ實現RPC最重要的兩個概念,具體程式碼比較簡單還是貼下把。
client 端
public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) throws Exception{ RPCClient fibonacciRpc = new RPCClient(); try { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (Exception e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } }
服務端
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
三、總結
這次回頭再看RabbitMQ,再次重新理解了以下RabbitMQ,有些東西還是要慢慢嚼的。當然這些也都是官網的入門例子,後續有機會的話再深入研究。