RocketMQ專題1:入門
RocketMQ入門
原始碼和應用下載
這裡以RocketMQ的4.3.0版本為例,本地環境為windows10,jdk1.8, maven3.2.1.
原始碼下載地址:ofollow,noindex" target="_blank">http://mirrors.hust.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip
應用下載地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.0/rocketmq-all-4.3.0-bin-release.zip
啟動
Windows下需要配置環境變數,ROCKETMQ_HOME, 我這裡配置為:E:\software\rocketmq-all-4.3.0-bin-release
配置完環境變數後,就可以進入到bin目錄:
-
啟動server: 直接執行bin目錄下的
mqnamesrv.cmd
-
啟動broker: 執行
mqbroker.cmd
,發現一閃而過,檢視bin目錄下的bk.log日誌,發現錯誤日誌如下:錯誤: 找不到或無法載入主類 Files\Java\jdk1.8.0_121\lib;C:\Program
再檢視
mqbroker.cmd
原始碼,發現其最終呼叫了runbroker.cmd
。該指令碼的倒數第二行為:set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
知道問題所在: CLASSPATH的配置中是包含空格的,而空格導致最終解析出來的路徑錯誤。最終我修改倒數第二行為:
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
至此可以順利啟動
本以為啟動之後就能就行訊息收發了,於是我按照官網示例進入RocketMQ的bin目錄,並通過命令向broker傳送訊息:
tools org.apache.rocketmq.example.quickstart.Producer
結果一直報錯,搜尋得知在windows下需要配置環境變數NAMESRV_ADDR
為127.0.0.1:9876
配置完成之後,再依次啟動mqnamesrv和mqbroker,重新測試Producer發現Producer的輸出大致如下:
...... SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9C03E5, offsetMsgId=C0A8130100002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=0], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9E03E6, offsetMsgId=C0A8130100002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=1], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115EA003E7, offsetMsgId=C0A8130100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=2], queueOffset=249] 11:44:47.790 [NettyClientSelector_1] INFORocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10911] result: true 11:44:47.791 [NettyClientSelector_1] INFORocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true 11:44:47.793 [NettyClientSelector_1] INFORocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10909] result: true
在通過命令列執行Consumer:
tools org.apache.rocketmq.example.quickstart.Consumer
發現Consumer的輸出為:
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=249, sysFlag=0, bornTimestamp=1537242287776, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287778, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BDFE, commitLogOffset=179710, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409812, UNIQ_KEY=C0A8029D46D461BBE9BA5A115EA003E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]] ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=248, sysFlag=0, bornTimestamp=1537242287768, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287768, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BB2E, commitLogOffset=178990, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409811, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9803E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53], transactionId='null'}]] ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=247, sysFlag=0, bornTimestamp=1537242287761, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287761, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B85E, commitLogOffset=178270, bodyCRC=684865321, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9103DF, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 49], transactionId='null'}]] ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=246, sysFlag=0, bornTimestamp=1537242287753, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287753, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B58E, commitLogOffset=177550, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E8903DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]]
關閉
關閉的步驟與啟動正好相反
-
關閉broker
:
mqshutdown broker
-
關閉namesrv
:
mqshutdown namesrv
簡單示例
在進行簡單的示例之前,我們先要知道為什麼會出現RocketMQ,下面一段話摘自RocketMQ官網:
Based on our research, with increased queues and virtual topics in use, ActiveMQ IO module reaches a bottleneck. We tried our best to solve this problem through throttling, circuit breaker or degradation, but it did not work well. So we begin to focus on the popular messaging solution Kafka at that time. Unfortunately, Kafka can not meet our requirements especially in terms of low latency and high reliability, see here for details. In this context, we decided to invent a new messaging engine to handle a broader set of use cases, ranging from traditional pub/sub scenarios to high volume real-time zero-loss tolerance transaction system. We believe this solution can be beneficial, so we would like to open source it to the community. Today, more than 100 companies are using the open source version of RocketMQ in their business. We also published a commercial distribution based on RocketMQ, a PaaS product called the Alibaba Cloud Platform.
可以知道RocketMQ是阿里在使用ActiveMQ時,出現了IO瓶頸,無法滿足阿里業務所需要的低延遲和高可靠性要求時自己研發出來。並且最終捐贈給Apache,成為頂級開源專案的。high volume real-time zero-loss tolerance transaction system
是其核心特點。
下面通過一個簡單的示例,來說明RocketMQ的基本使用:
引入pom依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
Producer
Producer一般分為三種模式: 同步、非同步和單向,具體程式碼如下:
public class SimpleProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException { /** * 同步訊息傳送: 一般用來進行通知、簡訊等重要訊息的同步 */ // syncProducer(); /** * 非同步訊息傳送: 一般用來對方法呼叫響應時間有較嚴格要求的情況下,非同步呼叫,立即返回 * 不同於同步的唯一在於: send方法呼叫的時候多攜帶一個回撥介面引數,用來非同步處理訊息傳送結果 */ asyncProducer(); /** * 單向模式: 一般用來對可靠性有一定要求的訊息傳送,例如日誌系統 * 不同於同步的唯一之處在於: 呼叫的是sendOneway方法,且該方法不會給呼叫者任何返回值 */ // oneWayProducer(); } private static void oneWayProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException { // STEP1: 建立Producer並且指定組名 DefaultMQProducer oneWayProducer = new DefaultMQProducer("GroupA"); // STEP2: 指定nameServer地址 oneWayProducer.setNamesrvAddr("localhost:9876"); // STEP3: 啟動Producer oneWayProducer.start(); // STEP4: 迴圈傳送訊息 for (int i = 0; i < 50; i++) { Message message = new Message("OneWayTopic", "TagA", ("OneWayMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); oneWayProducer.sendOneway(message); } // STEP5: 關閉Producer oneWayProducer.shutdown(); } private static void asyncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException { // STEP1: 建立Producer並且指定組名 DefaultMQProducer asyncProducer = new DefaultMQProducer("GroupA"); // STEP2: 指定nameServer地址 asyncProducer.setNamesrvAddr("localhost:9876"); // STEP3: 啟動Producer asyncProducer.start(); asyncProducer.setRetryTimesWhenSendAsyncFailed(0);// 設定非同步傳送失敗重試次數,預設為2 // STEP4: 迴圈傳送訊息 for (int i = 0; i < 50; i++) { Message message = new Message("AsyncTopic", "TagA", ("AsyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 建立回撥函式處理髮送成功或者異常 asyncProducer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } // STEP5: 關閉Producer TimeUnit.SECONDS.sleep(10); // 睡眠10秒,確保訊息都發送出去 asyncProducer.shutdown(); } private static void syncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException { // STEP1: 建立Producer並且指定組名 DefaultMQProducer syncProducer = new DefaultMQProducer("GroupA"); // STEP2: 指定nameServer地址 syncProducer.setNamesrvAddr("localhost:9876"); // STEP3: 啟動Producer syncProducer.start(); // STEP4: 迴圈傳送訊息 for (int i = 0; i < 50; i++) { Message message = new Message("SyncTopic", "TagA", ("SyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = syncProducer.send(message); System.out.println(sendResult); } // STEP5: 關閉Producer syncProducer.shutdown(); } }
Consumer
consumer的實現就較為簡單了,定義一個事件監聽介面即可.
public class SimpleConsumer { public static void main(String[] args) throws MQClientException { // STEP1: 建立預設Consumer並指定 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); // STEP2: 指定nameServer地址 consumer.setNamesrvAddr("localhost:9876"); // STEP3: 訂閱對應主題和tag consumer.subscribe("AsyncTopic", "*"); // STEP4: 註冊接收到broker訊息後的處理介面 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { System.out.println(new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // STEP5: 啟動consumer (必須在註冊完訊息監聽器之後啟動,否則會報錯) consumer.start(); System.out.println("Consumer started......"); } }
總結
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
參考連結
http://rocketmq.apache.org/docs/simple-example/