目录
2.2.1 Exchange发送至queue过程-Return机制
经过前面的学习,我们终于来到一个跟我们实际开发应用场景贴合度非常高的话题了。-消息丢失。
这也是我们之前提到的引入MQ后系统的缺点,可用性降低,复杂度提升,那我们这次就着手看实际应用中如何规避这些问题,首先来解决消息丢失的问题。
消息是如何丢的呢?在搞清楚这个问题之前,我们需要再复习下,消息从生产者发出到消费者整个的过程,如下图:
第一步:生产者发送到Exchange,这个过程可能由于网络抖动等原因,消息未能发送到Exchange,或者发送到了Exchange,而生产者没有得到发送成功的反馈
第二步:Exchange发送消息路由至Queue,这一步可能会因为mq内部问题,宕机等原因,导致Exchange并未将消息推送至Queue,而此时,生产者认为已将消息成功送达了。
第三步:Queue存储问题,我们知道Exchange是不存储消息的,但Queue是存储的,这里面就涉及到这个消息是否要持久化,还是放内存,mq出问题时会导致消息丢失
第四步:消费者消费的过程,从Queue到消费者的传输过程中,或消费的时候可能因为各种原因出现异常,未能按预期的程序逻辑将消息执行完,也作为消息丢失的一种。
那我们怎么解决呢?我们还是按照上面介绍的4个步骤分别去控制消息防丢,最终实现整体消息的一个完整消费过程。
2.1.1 事务
我们很容易想到,开启事务不就行了么,成功发送到Queue后事务提交,中间出现任何异常,事务回滚,一定万无一失,确保消息能完整的送到Queue中去。
的确rabbitmq提供了事务机制,我们看下核心代码:-基于java amqp agent
- /*
- *开启事务
- * */
- channel.txSelect();
-
- /**提交事务
- * */
- channel.txCommit();
-
- /**回滚事务
- * */
- channel.txRollback();
但实际应用中呢,我们一般不采取这种方式,系统中引入mq的一个原因是想让系统处理更快,加入rabbitmq的事务后,性能急剧下降,mq失去了它原有的轻盈,快速,变成了一只老年兔子,很稳但太慢。
2.1.2 Confirm机制
RabbitMQ提供了Confirm机制。比事务效率高。分为:普通Confirm方式、批量Confirm方式、异步Confirm方式。使用Confirm机制后,mq会明确的告诉你,生产者一定是成功把消息发送到了Exchange,【注意:不是到queue】,
若返回失败,我们可以根据实际业务情况进行处理,是记录到日志后后续重试,还是立马重试,重试几次后做记录等等
上代码:
2.1.2.1 普通Confirm方式
针对普通的一条消息的发送成功后的确认
-
- import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.MessageProperties;
-
- import java.io.IOException;
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 1.普通 confirm 模式
- * @createTime 2022/07/27 19:34:00
- */
- public class NormalConfirmProducer {
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
- sendMsg(channel);
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel) throws IOException, InterruptedException {
- //开启确认机制
- channel.confirmSelect();
- //3、发送消息到exchange
- String msg = "hello confirm";
-
- channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
-
- if (channel.waitForConfirms()) {
- System.out.println("生产者发布消息至Exchange成功!");
- } else {
- System.out.println("生产者发布消息至Exchange失败!请重试");
- }
- }
-
- }
2.1.2.2 批量Confirm方式
针对一批消息的确认,虽然性能较高了,但控制就不是很精准了,使用时自己权衡。与单笔确认区别:就是控制确认的时机发生了变化,单笔确认就是一条一确认。
-
- import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.MessageProperties;
-
- import java.io.IOException;
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 2.批量 confirm 模式
- * @createTime 2022/07/27 19:34:00
- */
- public class BatchConfirmProducer {
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
- sendMsg(channel);
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel) throws IOException, InterruptedException {
- //开启确认机制
- channel.confirmSelect();
- //3、发送消息到exchange
- String msg = "hello confirm";
- for (int i = 0; i < 10; i++) {
- channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
- }
-
- if (channel.waitForConfirms()) {
- System.out.println("生产者发布消息至Exchange成功!");
- } else {
- System.out.println("生产者发布消息至Exchange失败!请重试");
- }
- }
-
- }
2.1.2.3 异步Confirm方式
有的小伙伴会说,这还是慢啊,我发条消息还得等结果,发送性能难以保证,好,提供异步的方式,先往里面发,注册一个监听,靠异步返回的形式来确认消息的确发送成功了,若收到消息会有明确的成功失败,一直收不到监听返回,也可认为发送失败了
-
- import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConfirmListener;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.MessageProperties;
-
- import java.io.IOException;
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 3.异步confirm 模式
- * @createTime 2022/07/27 19:34:00
- */
- public class AsynConfirmProducer {
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
- sendMsg(channel);
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel) throws IOException, InterruptedException {
- //开启确认机制
- channel.confirmSelect();
- //3、发送消息到exchange
- String msg = "hello confirm";
- for (int i = 0; i < 10; i++) {
- channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
- }
-
- //3.3、开启异步回调
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("生产者发布消息至Exchange成功,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
- }
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("生产者发布消息至Exchange失败,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
- }
- });
- System.in.read();
- }
-
- }
好,保证了消息成功稳定的抵达了Exchange,那从Exchange路由到queue如何保证呢,请出 Return机制-采用Return机制来保证消息是否由exchange发送到了queue中。
-
- import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 3.异步confirm 模式
- * @createTime 2022/07/27 19:34:00
- */
- public class AsynConfirmReturnProducer {
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
- sendMsg(channel);
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel) throws IOException, InterruptedException {
- //开启确认机制
- channel.confirmSelect();
- //3、发送消息到exchange
- String msg = "hello confirm";
- for (int i = 0; i < 5; i++) {
- //3.2、发送消息,第三个设置为true才会有Return机制,默认为false
- //使用void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
- channel.basicPublish("", "no-lost", true, MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
- }
-
- //3.3、开启异步回调
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("生产者发布消息至Exchange成功,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
- }
-
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("生产者发布消息至Exchange失败,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
- }
- });
-
- //3.1.2、开启Return机制
- channel.addReturnListener(new ReturnListener() {
- @Override
- public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //当送达失败是才会回调
- System.out.println(new String(body, "utf-8") + ",消息没有送达到queue中");
- }
- });
-
- System.in.read();
- }
-
- }
好的,太不容易了,消息终于抵达了queue中,我们要保证消息不丢,需要将消息持久化,放内存中是不保险的, 这里有会有两个问题,
第一 发送的消息是有属性的,需要设置 持久化标识。
第二 创建的队列也是由属性的,也需要设置持久化,
这样才保证了消息的确持久化到了磁盘中。
- //deliveryMode =2 持久化消息
-
- channel.basicPublish("", "persist-show", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
-
- /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
- public static final BasicProperties PERSISTENT_TEXT_PLAIN =
- new BasicProperties("text/plain",
- null,
- null,
- 2,
- 0, null, null, null,
- null, null, null, null,
- null, null);
-
- //声明队列durable=true
- channel.queueDeclare("persist-show", true, false, false, null);
-
- Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
- Map
arguments) throws IOException;
有的小伙伴会问,这样就会存储到磁盘么,如何证明?
看面板:
看磁盘存储
文件内容:
完整生产者代码:
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 持久化
- * C:\Users\xxx\AppData\Roaming\RabbitMQ
- * @createTime 2022/07/27 19:34:00
- */
- public class PersistProducer {
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
-
- channel.queueDeclare("persist-show", true, false, false, null);
-
- for (int i = 0; i < Integer.MAX_VALUE; i++) {
- try {
- sendMsg(channel);
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("生产者发布消息失败!请重试");
- }
- Thread.sleep(1000);
- }
-
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel) throws IOException, InterruptedException {
- //3、发送消息到exchange
- String msg = "i am a persist message";
- channel.basicPublish("", "persist-show", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
- System.out.println("生产者发布消息成功");
- }
-
- }
太不容易了,整了半天终于到消费了,前面所有的努力都是为成功的消费呀,消费这里确保消费成功咯。要回到前面所讲过的 消费端的ACK了。【rabbitmq模式中-WORK模式提及】
也就是开启消费端的手工确认机制-每次成功消费消息后,明确告知mq,消费成功了,你可以删除掉这条消息了。
若果没告知mq,那这条消息就一直会处于unacked状态,等消费者重启会再次消费。
所以消费完成后一定要明确告知 Broker,是 Ack ,还是NAck 还是Reject 了。
2.3.1 消费确认异常再次放回队列执行,【可设定次数-N次之后就不要重试了-直接Ack了】
-
- import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
- import com.rabbitmq.client.*;
- import lombok.SneakyThrows;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 消息确认
- * @createTime 2022/07/27 19:36:00
- */
- public class ConfirmConsumer1 {
-
- private static final Map
errorMap = new HashMap<>(); -
- //消费者
- public static void main(String[] args) throws Exception {
- //1、获取连对象、
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
-
- //3、创建队列
- channel.queueDeclare("no-lost", true, false, false, null);
-
- //4.开启监听Queue
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @SneakyThrows
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收到消息:" + new String(body, "UTF-8"));
- try {
- //具体业务
- int i = 1 / 0;
- //确认
- channel.basicAck(envelope.getDeliveryTag(), false);
- } catch (Exception e) {
- if (errorMap.get(new String(body, "UTF-8")) != null) {
- System.out.println("消息已重复处理失败,拒绝再次接收...");
- channel.basicReject(envelope.getDeliveryTag(), false);
- } else {
- System.out.println("消息即将再次返回队列处理...");
- channel.basicNack(envelope.getDeliveryTag(), false, true);
- errorMap.put(new String(body, "UTF-8"), 1);
- }
- }
- }
- };
- channel.basicConsume("no-lost", false, consumer);
- System.out.println("消费者开始监听队列");
-
- //5、键盘录入,让程序不结束!
- System.in.read();
-
- //6、释放资源
- channel.close();
- connection.close();
- }
-
- }
2.3.2 消费确认异常后直接ACK,会再finally块,记录此次异常记录,后续和发送端结合再次发起重试
-
- import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
- import com.rabbitmq.client.*;
- import lombok.SneakyThrows;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description 消息确认
- * @createTime 2022/07/27 19:36:00
- */
- public class ConfirmConsumer2 {
-
-
- //消费者
- public static void main(String[] args) throws Exception {
- //1、获取连对象、
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
-
- //3、创建队列
- channel.queueDeclare("no-lost", true, false, false, null);
-
- //4.开启监听Queue
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @SneakyThrows
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收到消息:" + new String(body, "UTF-8"));
- try {
- //具体业务
- int i = 1 / 0;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- //确认
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- channel.basicConsume("no-lost", false, consumer);
- System.out.println("消费者开始监听队列");
-
- //5、键盘录入,让程序不结束!
- System.in.read();
-
- //6、释放资源
- channel.close();
- connection.close();
- }
-
- }
2.3.3 消费异常后使用死信队列-再死信队列里单独处理
死信队列后续单独介绍
原理也很简单就是将未消费的消息,再次转发到了一个Exchange,Exchange再路由到对应的Queue中去而已。
至此,兄弟们,若想保证消息不丢失,我们需要做这么多事情,各种环节,保障各种异常情况。是不是头都大了。代码复杂度高,性能也没法保证。
真正的实际应用中有各个环节都保证消息防丢的使用么?有的同学可以留言讨论下。
本人在实际应用中,包括看大部分的同学的代码,都不会做这么多的机制去保证消息防丢,最多是做一个消费端的ACK。
甚至很多同学把MQ这里面的内容当做一个黑盒,或根本就不了解MQ里面的原理,只需要知道发送和接收,就能用起来,但发生了问题也必将麻烦。
推荐方式:预警+补偿=最终一致性
这里面我们引入一个实际的案例:
场景描述:发红包,领取红包的人若超过一定限额,将红包退回。【不用关注是否合理】
那这里面我实际没关注,mq过程中任何保证消息丢失的措施,完全把mq黑盒化。
最核心的内容是 设定了一个超退状态,消费完成后把这个状态置为已完成即可,
若此状态在一定时间后【例如5分钟】还没发生变化,那我们会JOB 扫描到再次发起重试,做好幂等即可。
仍补偿多次补偿不成功的,可以记录 超退失败,通过预警机制预警处理,人为干预即可。
当然场景不同,运用方式不同,还是需要结合自己实际的业务,选择合适的方式,来保证消息的准确、稳定、及时的投递。