《從0開始學RocketMQ》—叢集搭建
用兩臺伺服器,搭建出一個雙master雙slave、無單點故障的高可用 RocketMQ 叢集。此處假設兩臺伺服器的物理 IP 分別為:192.168.50.1、192.168.50.2。
內容目錄
1. 啟動 NameServer 叢集
2. 啟動 Broker 叢集
3. RocketMQ 視覺化管理控制檯:rocketmq-console
4. 叢集測試
1. 啟動 NameServer 叢集
在兩臺伺服器上分別啟動 NameServer,可以得到一個無單點故障的 NameServer 服務,服務地址分別為:192.168.50.1:9876、192.168.50.2:9876。
2. 啟動 Broker 叢集
修改 Broker 配置檔案,以使每臺伺服器上都可以啟動一個 Master 角色 的 Broker 和 一個Slave 角色的 Broker。
首先找到 Broker 配置檔案,此處我們搭建一個同步雙寫模式的叢集,所以需要修改 2m-2s-sync 目錄下的 broker 配置檔案:
[root@157-89 ~]# cd /usr/local/rocketmq-all-4.3.2-bin-release/conf/ [root@157-89 conf]# ls 2m-2s-async2m-2s-sync2m-noslavebroker.conflogback_broker.xmllogback_namesrv.xmllogback_tools.xml [root@157-89 conf]# cd 2m-2s-sync/ [root@157-89 2m-2s-sync]# ls broker-a.propertiesbroker-a-s.propertiesbroker-b.propertiesbroker-b-s.properties
1) 修改 192.168.50.1 伺服器上的 broker-a.properties 為 Master 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876 brokerClusterName=rocketMqCluster brokerIP1=192.168.50.1 brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=120 mapedFileSizeConsumeQueue=500000 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/commitlog storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/consumequeue storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/index storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/checkpoint abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/abort
2) 修改 192.168.50.2 伺服器上的 broker-b.properties 為 Master 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876 brokerClusterName=rocketMqCluster brokerIP1=192.168.50.2 brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=120 mapedFileSizeConsumeQueue=500000 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/commitlog storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/consumequeue storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/index storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/checkpoint abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/abort
3) 修改 192.168.50.1 伺服器上的 broker-b-s.properties 為 Slave 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876 brokerClusterName=rocketMqCluster brokerIP1=192.168.50.1 brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=120 mapedFileSizeConsumeQueue=500000 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10921 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/commitlog storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/consumequeue storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/index storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/checkpoint abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/abort
4) 修改 192.168.50.2 伺服器上的 broker-a-s.properties 為 Slave 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876 brokerClusterName=rocketMqCluster brokerIP1=192.168.50.2 brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10921 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/commitlog storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/consumequeue storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/index storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/checkpoint abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/abort
一臺伺服器上啟動多個Broker 時,需指定不同的埠號,記得防火牆放開 NameServer 和 Broker 中用到的埠號哦~
分別啟動四個 Broker:
nohup sh bin/mqbroker -c broker_config_file &
3. RocketMQ 視覺化管理控制檯:rocketmq-console
在伺服器 192.168.50.1 上安裝即可,無需叢集
[root@153-215 local]# git clone https://github.com/apache/rocketmq-externals.git Cloning into 'rocketmq-externals'... remote: Enumerating objects: 10, done. remote: Counting objects: 100% (10/10), done. remote: Compressing objects: 100% (10/10), done. remote: Total 9425 (delta 2), reused 1 (delta 0), pack-reused 9415 Receiving objects: 100% (9425/9425), 11.86 MiB | 232.00 KiB/s, done. Resolving deltas: 100% (4235/4235), done. [root@153-215 local]# cd rocketmq-externals/ [root@153-215 rocketmq-externals]# ls devREADME.mdrocketmq-consolerocketmq-dockerrocketmq-flinkrocketmq-flumerocketmq-hbaserocketmq-iot-bridgerocketmq-jmsrocketmq-mysqlrocketmq-phprocketmq-redisrocketmq-sentinelrocketmq-serializerrocketmq-spark [root@153-215 rocketmq-externals]# git branch * master [root@153-215 rocketmq-externals]# git fetch origin release-rocketmq-console-1.0.0 From https://github.com/apache/rocketmq-externals * branchrelease-rocketmq-console-1.0.0 -> FETCH_HEAD [root@153-215 rocketmq-externals]# git checkout -b release-1.0.0 origin/release-rocketmq-console-1.0.0 Branch 'release-1.0.0' set up to track remote branch 'release-rocketmq-console-1.0.0' from 'origin'. Switched to a new branch 'release-1.0.0' [root@153-215 rocketmq-externals]# ls README.mdrocketmq-console [root@153-215 rocketmq-externals]# ls rocketmq-console/ docLICENSENOTICEpom.xmlREADME.mdsrcstyle [root@153-215 rocketmq-externals]# vim rocketmq-console/src/main/resources/application.properties
編輯 application.properties:
server.contextPath=/rocketmq server.port=8080 #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddrNAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 rocketmq.config.namesrvAddr=192.168.50.1:9876;192.168.50.2:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true
移動 rocketmq-console 所在目錄,編譯並啟動 rocketmq-console:
[root@153-215 rocketmq-console]# mv /usr/local/rocketmq-externals/rocketmq-console /usr/local/rocketmq-console [root@153-215 rocketmq-console]# cd /usr/local/rocketmq-console/ [root@153-215 rocketmq-console]# ls docLICENSENOTICEpom.xmlREADME.mdsrcstyle [root@153-215 rocketmq-console]# mvn clean package -Dmaven.test.skip=true ........ [INFO] Building jar: /usr/local/rocketmq-console/target/rocketmq-console-ng-1.0.0-sources.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 02:54 min [INFO] Finished at: 2019-01-11T17:02:34+08:00 [INFO] ------------------------------------------------------------------------ [root@153-215 rocketmq-console]# ls docLICENSENOTICEpom.xmlREADME.mdsrcstyletarget [root@153-215 rocketmq-console]# ls target/ checkstyle-cachefilecheckstyle-checker.xmlcheckstyle-result.xmlclassesgenerated-sourcesmaven-archivermaven-statusrocketmq-console-ng-1.0.0.jarrocketmq-console-ng-1.0.0.jar.originalrocketmq-console-ng-1.0.0-sources.jar [root@153-215 rocketmq-console]# java -jar target/rocketmq-console-ng-1.0.0.jar ....... [2019-01-11 17:04:15.980]INFO Initializing ProtocolHandler ["http-nio-8080"] [2019-01-11 17:04:15.991]INFO Starting ProtocolHandler [http-nio-8080] [2019-01-11 17:04:16.232]INFO Using a shared selector for servlet write/read [2019-01-11 17:04:16.251]INFO Tomcat started on port(s): 8080 (http) [2019-01-11 17:04:16.257]INFO Started App in 6.594 seconds (JVM running for 7.239)
4. 叢集測試
Producer 測試程式碼:
public class SyncProducerTest { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("producer_test_group"); producer.setNamesrvAddr("39.107.153.215:9876;39.107.157.89:9876"); try{ producer.start(); for(int i=0;i<100;i++){ Message message = new Message("topic_test", "tag_test", ("Hello World" + 1).getBytes("UTF-8")); SendResult sendResult = producer.send(message); System.out.println(JSON.toJSON(sendResult)); } producer.shutdown(); }catch (Exception e){ e.printStackTrace(); } } }
Consumer 測試程式碼:
public class SyncConsumerTest { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test_group"); consumer.setNamesrvAddr("39.107.153.215:9876;39.107.157.89:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); try { consumer.subscribe("topic_test", "*"); consumer.registerMessageListener((MessageListenerConcurrently) (messageList, context) -> { System.out.println(Thread.currentThread().getName() + " Receive New Message:" + messageList); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }catch (Exception e){ e.printStackTrace(); } } }
SyncProducerTest 執行日誌:
SyncConsumerTest 執行日誌:
通過日誌可以看到,消費者、生產者收發訊息都是正常的,我們去視覺化管理控制檯檢視下 http://192.168.50.1:8080/rocketmq:
通過管控臺可以看到,雙 master 雙 slave 的 broker 叢集一切正常,並可進一步看到每個 broker 處理訊息的情況。