为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制
(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制,只要将自动ack设置为false,消费者有足够的时间处理消息数据,消费者可以在处理完后续的业务逻辑后再进行提交ack,确保消息确实是被消费了,防止服务宕机可能导致的消息丢失。而MQ会一直等待消费者手动提交ack!!
在rabbitmq管理页面上可以详细看到队列中的消息情况:
自动ACK: 消费者
配置中如果是自动ack机制,MQ将消息发送给消费者后直接就将消息给删除了
,这个的前提条件是消费者程序没有出现异常,如果消费者接收消息后处理时出现异常,那么MQ将会尝试重发消息给消费者直至达到了消费者服务
中配置的最大重试次数后将会直接抛出异常不再重试。
手动ACK:消费者
设置了手动ACK机制后,可以显式的提交/拒绝消息(这一步骤叫做发送ACK),如果消息被消费后正常被提交了ack,那么此消息可以说是流程走完了,然后MQ将此消息从队列中删除。而如果消息被消费后被拒绝了,消费者可选择让MQ重发此消息或者让MQ直接移除此消息。后面可以使用死信队列来进行接收这些被消费者拒绝的消息,再进行后续的业务处理。
RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。
其中发送方确认又分为:生产者到交换机到确认、交换机到队列的确认。(借用下大佬的图)
ConfirmCallback()方法,是一个回调方法,生产者将消息发送给Broker(RabbitMQ服务),然后Broker给回调生产者的ConfirmCallback()方法告知生产者消息是否接收到。也就是确认消息是否正常到达 Exchange 中。
# 我们需要在生产者中添加配置,表示开启发布者确认(注意新旧版本)
spring.rabbitmq.publisher-confirm-type=correlated # 新版本
spring.rabbitmq.publisher-confirms=true # 老版本
ReturnCallback()方法同样是一个回调方法,是交换机和队列之间的消息确认方式。启动消息失败返回,此方法是在交换器路由不到队列时触发回调,这个可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了
# 在生产者中配置,表示发布者返回
spring.rabbitmq.publisher-returns=true
使用
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
# 消息确认(ACK)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
理解:springboot中需要给RabbitTemplate设置一些方法的回调即可。
通常情况下我们可以直接在配置类中设置好这些东西,但是可能由于某些业务需求,并不是所有的消息都使用常用的方式,也可以将我们的消息发送服务实现接口然后重写这些回调。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//确认消息送到交换机(Exchange)回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("\n确认消息送到交换机(Exchange)结果:");
System.out.println("相关数据:" + correlationData);
System.out.println("是否成功:" + ack);
System.out.println("错误原因:" + cause);
});
//确认消息送到队列(Queue)回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
});
return rabbitTemplate;
}
以接口的形式访问发送一下。注意:确认消息送到队列(Queue)回调,只有在出现错误时才回调。
将发送的服务类实现接口,实现回调
@Service
public class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
private static Logger logger = LoggerFactory.getLogger(SendMessageService.class);
@Autowired
public RabbitTemplate rabbitTemplate;
public void sendMessage(String str){
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setConfirmCallback(this);
// CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的
rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString()));
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("sender return success" + returnedMessage.toString());
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
if (!b) {
logger.error("消息发送异常!");
// 进行处理
} else {
logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
}
}
}
接口方式访问下。没问题
需要注意的是:配置类方式和局部方式只能选择其一,如果一个RabbitTemplate设置了两个或者多个ConfirmCallback/ReturnCallback,会报错的不支持。类似这样的报错:Only one ConfirmCallback/ReturnCallback is supported by each RabbitTemplate。在开发过程中需要注意!!
消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。
RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
消息确认模式有:
消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。
配置,注意是simple模式的ack还是direct模式,或者两个都设置上
server:
port: 9000
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
# 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673
消费者接收数据
其中在调用basiAck或basicNack时必须要携带一个tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。而在接收者方法上使用@Header(AmqpHeaders.DELIVERY_TAG)
可以直接获取到这个tag。
@Component
public class MQConsumer {
@Autowired
private DispatcherService dispatcherService;
@RabbitListener(queues = "order.queue")
public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
System.out.println("消息:" + orderMsg);
JSONObject order = JSONObject.parseObject(orderMsg);
String orderId = order.getString("orderId");
// 派单处理
dispatcherService.dispatch(orderId);
System.out.println(1 / 0); // 出现异常
// 手动确认
channel.basicAck(tag, false);
} catch (Exception e) {
// 如果出现异常的情况下 根据实际情况重发
// 重发一次后,丢失
// 参数1:消息的tag
// 参数2:多条处理
// 参数3:重发
// false 不会重发,会把消息打入到死信队列
// true 重发,建议不使用try/catch 否则会死循环
// 手动拒绝消息
channel.basicNack(tag, false, false);
}
}
}
消息的接收者也可使用普通类实现ChannelAwareMessageListener
接口,重写方法完成,这种是直接全局性接收的。没有最好的,只有最合适的,根据项目情况选择全局接收还是单个类接收自己监听的。
/**
* 接收者
*
**/
@Component
public class Consumer implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
{
System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
System.out.println("执行queue_name中的消息的业务处理流程......");
}
if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
{
System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
System.out.println("执行fanout.A中的消息的业务处理流程......");
}
// 手动提交ack,并且批量确认消息
channel.basicAck(deliveryTag, true);
}
catch (Exception e)
{
e.printStackTrace();
/**
* 拒绝消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
channel.basicReject(deliveryTag, true);
}
}
}
参考自