生产者生产消息到消费者消息消费,中间需要生产者将消息发送到交换器,再由交换器路由到队列存储,然后消费者进行消息消费。在没有任何设置情况下,中间可能存在以下几种情况导致消息丢失:

针对上述情况,本文将根据每个节点讲述如何操作确保消息投递的可靠性,同时在保障可靠性的情况下可能会引发系列如消息重复等问题,也是本文将会涉及到的重点
理解数据库的事务就是将多次操作原子化,统一提交回滚实现数据一致性。RabbitMQ事务可能与之前数据库等事务有一定差别,当然也是支持消息的提交与回滚操作。事务是解决生产者发送消息到交换器消息丢失问题的方案之一
RabbitMQ中的事务实现有如下两个个主要步骤:

// 将信道设置为事务模式
channel.txSelect();
// 发送消息
try {
channel.basicPublish(bindingKey, bindingKey, false, false, null, messgae.getBytes("UTF-8"));
// 提交事务
channel.txCommit();
}catch (Exception e){
// 异常回滚事务,发生异常的时候可以尝试重发或者记录等操作
channel.txRollback();
}
事务机制的确认需要等待上一条消息发送完毕反馈之后才能进行第二条消息发送,这样的操作将会对于使用RabbitMQ而言感觉就是暴殄天物。如果是想要发送多条消息只能循环操作,但是注意如果没有将channel信道设置为事务模式不能进行事务操作,不然会抛出异常。最后一点就是信道设置为事务模式只需要操作一次即可
事务机制对于RabbitMQ性能消耗是灾难性的,针对生产者到交换器消息丢失处理提出了全新的轻量级处理方式。发送发确认机制confirm,该操作编码上会有很多地方都在讲什么等待确认、批量确认、异步确认。一切为了生产,所以本文将只会介绍生产用的异步确认方式
RabbitMQ的生产发送确认实现由以下三个部分构成:
// 设置confirm消息发送确认机制
channel.confirmSelect();
// 增加确认机制监听器
channel.addConfirmListener(new ConfirmListener() {
/**
* 成功确认
* deliveryTag 表示消息的唯一标识
* multiple 本次确认是否为批量操作
*/
@Override
public void handleAck (long deliveryTag, boolean multiple) throws IOException {
// 删除有序集合中的消息
if (! multiple){
// 根据坐标删除消息
}else {
// 批量删除消息
}
}
/**
* 失败确认
* deliveryTag 表示消息的唯一标识
* multiple 本次确认是否为批量操作
*/
@Override
public void handleNack (long deliveryTag, boolean multiple) throws IOException {
// 可以根据deliveryTag做重试操作等
}
});
