rabbitmq學習(七) —— springboot下的可靠使用
前面的學習都是基於原生的api,下面我們使用spingboot來整合rabbitmq
springboot對rabbitmq提供了友好支援,極大的簡化了開發流程
引入maven
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
配置yml
rabbitmq: host: 47.102.103.232 port: 5672 username: admin password: admin virtual-host: /test publisher-confirms: true publisher-returns: true cache: channel: size: 10 listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 3 retry: enabled: true
這是基礎的配置,看不懂的配置後面會介紹
更詳細的配置參考官方 https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-rabbitmq (搜尋rabbit往下拉即可)
程式碼實現
配置類
@Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("helloQueue"); } //建立topic交換機 @Bean public TopicExchange helloExchange() { return new TopicExchange("helloExchange"); } @Bean public Binding bindingPaymentExchange(Queue helloQueue, TopicExchange helloExchange) { return BindingBuilder.bind(helloQueue).to(helloExchange).with("hello.#"); } /** * 定製化amqp模版 * connectionFactory:包含了yml檔案配置引數 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 必須設定為 true,不然當 傳送到交換器成功,但是沒有匹配的佇列,不會觸發 ReturnCallback 回撥 // 而且 ReturnCallback 比 ConfirmCallback 先回調,意思就是 ReturnCallback 執行完了才會執行 ConfirmCallback rabbitTemplate.setMandatory(true); // 設定 ConfirmCallback 回撥yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { // 如果傳送到交換器都沒有成功(比如說刪除了交換器),ack 返回值為 false // 如果傳送到交換器成功,但是沒有匹配的佇列(比如說取消了繫結),ack 返回值為還是 true (這是一個坑,需要注意) if (ack) { String messageId = correlationData.getId(); System.out.println("confirm:"+messageId); } }); // 設定 ReturnCallback 回撥yml需要配置 publisher-returns: true // 如果傳送到交換器成功,但是沒有匹配的佇列,就會觸發這個回撥 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String messageId = message.getMessageProperties().getMessageId(); System.out.println("return:"+messageId); }); return rabbitTemplate; } }
回撥機制
- 訊息不管是否投遞到交換機都進行 ConfirmCallback回撥,投遞成功ack=true,否則為false
- 交換機匹配到佇列成功則不進行 ReturnCallback回撥,否則先進行 ReturnCallback回撥再進行 ConfirmCallback回撥
- 如果訊息成功投遞到交換機,但沒匹配到佇列,則 ConfirmCallback回撥ack仍為true
生產者
@Component public class RbProducer { //注意一定要使用RabbitTemplate!! //雖然RabbitTemplate實現了AmqpTemplate 但是AmqpTemplate裡並沒有能傳送correlationData的方法 @Resource private RabbitTemplate rbtemplate; public void send1(String msg){ //CorrelationData用於confirm機制裡的回撥確認 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rbtemplate.convertAndSend("helloExchange", "hello.yj", msg,correlationData); } public void send2(User user){ CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rbtemplate.convertAndSend("helloExchange", "hello.yj", user,correlationData); } }
消費者
@Component @RabbitListener(queues = "helloQueue") public class RbConsumer { @RabbitLister(queues = "helloQueue") public void receive0(Message msg,Channel channel) throws IOException { System.out.println("consumer receive message0: " + msg); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } @RabbitHandler public void receive1(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("consumer receive message1: " + msg); channel.basicAck(deliveryTag, false); } @RabbitHandler public void receive2(User user, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("consumer receive message2: "+user); //如果發生以下情況投遞訊息所有的通道或連線被突然關閉(包括消費者端丟失TCP連線、消費者應用程式(程序)掛掉、通道級別的協議異常)任何已經投遞的訊息但是沒有被消費者端確認的訊息會自動重新排隊。 //請注意,連線檢測不可用客戶端需要一段時間才會發現,所以會有一段時間內的所有訊息會重新投遞 //因為訊息的可能重新投遞,所有必須保證消費者端的介面的冪等。 //在RabbitMQ中影響吞吐量最大的引數是:訊息確認模式和Qos預取值 //自動訊息確認模式或設定Qos預取值為無限雖然可以最大的提高訊息的投遞速度,但是在消費者端未及時處理的訊息的數量也將增加,從而增加消費者RAM消耗,使用消費者端奔潰。所以以上兩種情況需要謹慎使用。 //RabbitMQ官方推薦Qos預取值設定在 100到300範圍內的值通常提供最佳的吞吐量,並且不會有使消費者奔潰的問題 channel.basicAck(deliveryTag, false); channel.basicQos(100); // 代表消費者拒絕一條或者多條訊息,第二個引數表示一次是否拒絕多條訊息,第三個引數表示是否把當前訊息重新入隊 // channel.basicNack(deliveryTag, false, false); // 代表消費者拒絕當前訊息,第二個引數表示是否把當前訊息重新入隊 // channel.basicReject(deliveryTag,false); } }
@RabbitListener+@RabbitHandler:消費者監聽
使用@RabbitListener+@RabbitHandler組合進行監聽,監聽器會根據佇列發來的訊息型別自動選擇處理方法
channel.basicAck(deliveryTag, false):手動確認機制
deliverTag:該訊息的標識,每來一個訊息該標識+1
multiple:第二個引數標識書否批量確認
requeue:被拒絕的是否重新入隊
channel.basicQos(100):最多未確認的訊息數量為100,超過100佇列將停止給該消費者投遞訊息
更多引數詳解參考 https://www.cnblogs.com/piaolingzxh/p/5448927.html
測試
@RunWith(SpringRunner.class) @SpringBootTest(classes = TestBoot.class) public class TestRabbit { @Resource private RbProducer producer; @Test public void send1() { producer.send1("hello,im a string"); } @Test public void send2() { User user = new User(); user.setNickname("hello,im a object"); producer.send2(user); } }
成功消費
完結
下篇部落格我們討論下在擁有了手動ack機制、confirm機制、return機制後,是否真的可靠~