springboot2.0整合rabbitmq
簡介: rabbitmq即一個訊息佇列,主要用來實現應用程式的非同步和解耦,訊息緩衝,訊息分發的作用.
由於rabbitmq依賴於erlang語言,所以先安裝erlang:
新增erlang solutions源 $ wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm $ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm $ sudo yum install erlang 複製程式碼
erlang完成後安裝rabbitmq:
先下載rpm:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm 複製程式碼
下載完成後安裝:
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm 複製程式碼
安裝時如果遇到下面的依賴錯誤
Error: Package: socat-1.7.2.3-1.el6.x86_64 (epel) Requires: libreadline.so.5()(64bit) 複製程式碼
可以嘗試先執行
$ sudo yum install socat 複製程式碼
rabbitmq的基本操作:
$ sudo chkconfig rabbitmq-server on # 新增開機啟動RabbitMQ服務 $ sudo /sbin/service rabbitmq-server start # 啟動服務 $ sudo /sbin/service rabbitmq-server status # 檢視服務狀態 $ sudo /sbin/service rabbitmq-server stop # 停止服務 #檢視當前所有使用者 $ sudo rabbitmqctl list_users #檢視預設guest使用者的許可權 $ sudo rabbitmqctl list_user_permissions guest #由於RabbitMQ預設的賬號使用者名稱和密碼都是guest。為了安全起見, 先刪掉預設使用者 $ sudo rabbitmqctl delete_user guest #新增新使用者 $ sudo rabbitmqctl add_user username password #設定使用者tag $ sudo rabbitmqctl set_user_tags username administrator #賦予使用者預設vhost的全部操作許可權 $ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*" #檢視使用者的許可權 $ sudo rabbitmqctl list_user_permissions username 複製程式碼
開啟遠端訪問
預設情況下,RabbitMQ的預設的guest使用者只允許本機訪問, 如果想讓guest使用者能夠遠端訪問的話,只需要將配置檔案中的loopback_users列表置為空即可,如下:
{loopback_users, []} 複製程式碼
另外關於新新增的使用者,直接就可以從遠端訪問的,如果想讓新新增的使用者只能本地訪問,可以將使用者名稱新增到上面的列表, 如只允許admin使用者本機訪問。
{loopback_users, ["admin"]} 複製程式碼
更新配置後,重啟服務.
###springboot2.0整合rabbitmq
pom引入start依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 複製程式碼
application.properties配置:
#rabbitmq config begin # rabbitmq伺服器地址 (預設為127.0.0.1) spring.rabbitmq.host=127.0.0.1 # rabbitmq伺服器連線埠 (預設為5672) spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 #rabbitmq config end 複製程式碼
rabbitmq javabean配置:
package com.snow.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Description: RabbitConfig * @Author: 愛飄de小子 * @CreateDate: 2018/8/15 19:59 * @Version: 1.0 */ @Configuration public class RabbitConfig { /** * 消費者數量,預設10 */ public static final int DEFAULT_CONCURRENT = 10; /** * 每個消費者獲取最大投遞數量 預設50 */ public static final int DEFAULT_PREFETCH_COUNT = 50; /** * 注入 Queue * @return */ @Bean public Queue Queue() { return new Queue("hello"); } /** * 併發消費配置 * @param configurer * @param connectionFactory * @return */ @Bean("pointTaskContainerFactory") public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(DEFAULT_PREFETCH_COUNT); factory.setConcurrentConsumers(DEFAULT_CONCURRENT); configurer.configure(factory, connectionFactory); return factory; } } 複製程式碼
傳送訊息:
/** * 注入AmqpTemplate */ @Autowired private AmqpTemplate rabbitTemplate; public void sendMessage(){ //傳送訊息 this.rabbitTemplate.convertAndSend("hello","你好,rabbitmq"); } 複製程式碼
convertAndSend方法的第一個引數為QueueName,第二個引數為訊息的內容
接收訊息:
/** * 訊息接受 * @param message */ @RabbitListener(queues = "hello")//監聽器監聽指定的QueueName public void receive(String message) { System.out.println("接收訊息:" + message); } 複製程式碼
併發消費:
sendMessage() 傳送訊息後,receive() 接受訊息,此時receive()接受訊息,等待處理完成後,下一個訊息才能進入receive(),如果想要訊息非同步消費,還需配置併發消費:
/** * 訊息接受併發消費 * @param message */ @RabbitListener(queues = "hello",containerFactory = "pointTaskContainerFactory") public void receive(String message) { System.out.println("接收訊息:" + message); } 複製程式碼
@RabbitListener註解中的containerFactory 是RabbitConfig配置的pointTaskContainerFactory,可以自定義如下引數:
/** * 消費者數量,預設10 */ public static final int DEFAULT_CONCURRENT = 10; /** * 每個消費者獲取最大投遞數量 預設50 */ public static final int DEFAULT_PREFETCH_COUNT = 50; 複製程式碼