RabbitMQ如何保证消息的可靠性:
1.从生产者到消息队列,congfirm模式(与事务相比confirm模式最大的优势是异步)通过消息确认机制来保证,通过给每个指派唯一标志,完成消费后返回ack确认, 2 消息队列 消息队列的持久化,包括交换机持久化,消息队列持久化以及消息持久化 3.消费者 消费完返回完后手动ack确认,开启RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。 若长时间没有ack确认会像下一个消费者发送任务,同时需要做等幂性处理。
RabbitMQ 消息丢失的源头主要有以下三个:
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
一般采用异步confirm,所以发送消息之前,需要把消息存起来。所以我们需要为消息分配全局唯一的Id,与消息内容一一对应。
生产者也需要监听Broker发送的通知,根据ack / nack 进行确认。
而具体实现时,需要注意:
rabbitmq:
# 开启发送确认
publisher-confirm-type: correlated
@Service
@Slf4j
public class MqSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送秒杀信息
public void sendSeckillMessage(String msg){
log.info("发送消息:" + msg);
rabbitTemplate.convertAndSend("seckillExchange", "seckill.message", msg);
}
}
//连接工厂
@Resource
private CachingConnectionFactory cachingConnectionFactory;
//向spring容器中注入RabbitTemplate对象,并配置开启确认消息回调和消息失败回调
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
/**
* 消息确认回调,确认消息是否到达broker
* correlationData:消息唯一标识
* ack:确认结果,消息是否发送成功
* cause:失败原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String msgId = correlationData.getId();
if (ack) {
LOGGER.info("消息发送成功============>" + msgId);
//监听消息是否发送成功,如果成功将状态改为1
// mailLogService.update(new UpdateWrapper().set("status", 1).eq("msgId", msgId));
} else {
LOGGER.error("消息发送到queue失败============>" + msgId);
}
});
return rabbitTemplate;
}
RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,需要给exchange、queue和message都进行持久化。
设置持久化,两个步骤:必须都设置。
无法保障100%不丢失,因为可能持久化完成前就宕机。
(若设置持久化,会在持久化完成后再发出ack)
@Bean
public Queue queue(){
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:该队列是否只供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能-一个消费者消费
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
return new Queue(QUEUE);
}
//绑定correlationId、以及消息持久化
rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message, Correlation correlation) {
MessageProperties messageProperties = message.getMessageProperties();
if (correlation instanceof CorrelationData) {
String correlationId = ((CorrelationData) correlation).getId();
messageProperties.setCorrelationId(correlationId);
}
// 持久化处理
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
消息入库,顾名思义就是将要发送的消息保存到数据库中。
如何保证消费成功(或限流处理):对应消费端丢数据,即消费完成前,宕机
RabbitMQ默认是自动ack的,缺点:
因此使用手动ack确认
spring:
rabbitmq:
listener:
simple:
# 消费者最小数量
concurrency: 10
# 消费者最大数量
max-concurrency: 10
# 限制消费者每次处理消息的数量(预抓取:等价于缓冲区大小,限流关键)
prefetch: 1
# manual:手动确认
acknowledge-mode: manual
@RabbitListener(queues = "seckillQueue")
@RabbitHandler
public void receiveDeadMsg(String msg, Channel channel, Message messages) throws Exception {
try {
log.info("接收消息:" + msg);
VoucherOrder voucherOrder = JsonUtil.jsonStr2Object(msg, VoucherOrder.class);
voucherOrderService.createVoucherOrder(voucherOrder);
// 回发ack,第二个参数multiple,表示是否批量确认
channel.basicAck(messages.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 回发Nack,且重回队列为false,防死循环
channel.basicNack(messages.getMessageProperties().getDeliveryTag(), false, false);
}
}
重复消费的原因:正常情况,消费完毕,RabbitMQ会回复一个确认消息给消息队列,消息队列就把这个消息删除了,但是因为宕机或者网络导致确认消息没返回成功,消息队列不知道自己消费过该消息,会将消息再次分发。
解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;
解决方法: