保证消息不丢失,可靠抵达,可以使用确认机制
publisher confirmCallback 确认模式
publisher returnCallback 未投递到queue 退回模式
consumer ack 机制

修改publisher服务中的application.yml文件
- spring:
- rabbitmq:
- publisher-confirms: true
- publisher-confirm-type: correlated
- publisher-returns: true
- template:
- mandatory: true
说明:
publish-confirm: 开启publish-confirm功能,确认回调publish-confirm-type:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirm结果,直到超时correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
publish-confirm 确认回调作用:确保消息进入交换机
发送消息
- package com.hdb.pingmoweb.order;
-
- import lombok.extern.slf4j.Slf4j;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- import java.util.UUID;
-
- @Slf4j
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class PingmowebOrderApplicationTests {
-
- @Test
- public void contextLoads() {
- }
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessageExchange() throws InterruptedException {
- // 1.消息体
- String message = "hello, spring amqp!";
- // 2.全局唯一的消息ID,需要封装到CorrelationData中
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- // 3.添加callback
- correlationData.getFuture().addCallback(
- result -> {
- if(result.isAck()){
- // 3.1.ack,消息成功
- log.info("消息发送成功, ID:{}", correlationData.getId());
- }else{
- // 3.2.nack,消息失败
- log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
- }
- },
- ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
- );
- // 4.发送消息
- rabbitTemplate.convertAndSend("itcast.direct", "red", message, correlationData);
- // 休眠一会儿,等待ack回执
- //Thread.sleep(2000);
- }
-
- }
接收消息
- package com.hdb.pingmoweb.order.mq;
-
- import org.springframework.amqp.core.ExchangeTypes;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
- key = {"red","blue"}
- ))
- public void listenDirectQueue1(String msg){
- System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
- key = {"red"}
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
- }
-
- }
publish-returns 消息进入队列回调配置RabbitTemplate,当routingKey找不到指定队列回调ReturanCallback
- package com.hdb.pingmoweb.order.mq;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- @Slf4j
- @Configuration
- public class MyRabbitConfig {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void initRabbitTemplate(){
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- // 投递失败,记录日志
- log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
- replyCode, replyText, exchange, routingKey, message.toString());
- // 如果有业务需要,可以重发消息
- });
- }
-
- }
SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- none 模式下,消息投递是不可靠的,可能丢失
- auto 模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual 自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
修改consumer服务的application.yml文件,添加下面内容:
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: auto # 关闭ack
消费者出现异常,消息不会被删除
同时可以使用manual ack机制手动ack
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
- key = {"red"}
- ))
- public void listenDirectQueue2(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
- System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
- if (msg.contains("success")) {
- // RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
- channel.basicAck(deliveryTag, false);
- } else {
- // 第三个参数true,表示这个消息会重新进入队列
- channel.basicNack(deliveryTag, false, true);
- }
- }
deliveryTag: 相当于消息的唯一标识,用于 mq 辨别是哪个消息被 ack/nak 了channel: mq 和 consumer 之间的管道,通过它来 ack/nak- channel.basicAck(deliveryTag, false); 成功确认
- channel.basicNack(deliveryTag, false, true); 失败返回队列