• RabbitMQ初步到精通-第五章-RabbitMQ之消息防丢失


    目录

    第五章-RabbitMQ之消息防丢失

    1.消息是如何丢的

    ​编辑

    2.如何控制消息丢失

    2.1 生产者发送消息到Broker过程

    2.2 Broker内部过程

            2.2.1 Exchange发送至queue过程-Return机制

            2.2.2 queue存储过程

    2.3 消费者消费过程-消费端确认

    3.最佳实践


    第五章-RabbitMQ之消息防丢失

    1.消息是如何丢的

    经过前面的学习,我们终于来到一个跟我们实际开发应用场景贴合度非常高的话题了。-消息丢失。

    这也是我们之前提到的引入MQ后系统的缺点,可用性降低,复杂度提升,那我们这次就着手看实际应用中如何规避这些问题,首先来解决消息丢失的问题。

    消息是如何丢的呢?在搞清楚这个问题之前,我们需要再复习下,消息从生产者发出到消费者整个的过程,如下图:

     第一步:生产者发送到Exchange,这个过程可能由于网络抖动等原因,消息未能发送到Exchange,或者发送到了Exchange,而生产者没有得到发送成功的反馈

    第二步:Exchange发送消息路由至Queue,这一步可能会因为mq内部问题,宕机等原因,导致Exchange并未将消息推送至Queue,而此时,生产者认为已将消息成功送达了。

    第三步:Queue存储问题,我们知道Exchange是不存储消息的,但Queue是存储的,这里面就涉及到这个消息是否要持久化,还是放内存,mq出问题时会导致消息丢失

    第四步:消费者消费的过程,从Queue到消费者的传输过程中,或消费的时候可能因为各种原因出现异常,未能按预期的程序逻辑将消息执行完,也作为消息丢失的一种。

    2.如何控制消息丢失

    那我们怎么解决呢?我们还是按照上面介绍的4个步骤分别去控制消息防丢,最终实现整体消息的一个完整消费过程。

    2.1 生产者发送消息到Broker过程

    2.1.1 事务

    我们很容易想到,开启事务不就行了么,成功发送到Queue后事务提交,中间出现任何异常,事务回滚,一定万无一失,确保消息能完整的送到Queue中去。

    的确rabbitmq提供了事务机制,我们看下核心代码:-基于java amqp agent

    1. /*
    2. *开启事务
    3. * */
    4. channel.txSelect();
    5. /**提交事务
    6. * */
    7. channel.txCommit();
    8. /**回滚事务
    9. * */
    10. channel.txRollback();

    但实际应用中呢,我们一般不采取这种方式,系统中引入mq的一个原因是想让系统处理更快,加入rabbitmq的事务后,性能急剧下降,mq失去了它原有的轻盈,快速,变成了一只老年兔子,很稳但太慢。

    2.1.2 Confirm机制

    RabbitMQ提供了Confirm机制。比事务效率高。分为:普通Confirm方式、批量Confirm方式、异步Confirm方式。使用Confirm机制后,mq会明确的告诉你,生产者一定是成功把消息发送到了Exchange,【注意:不是到queue】,

    若返回失败,我们可以根据实际业务情况进行处理,是记录到日志后后续重试,还是立马重试,重试几次后做记录等等

    上代码:

    2.1.2.1 普通Confirm方式

    针对普通的一条消息的发送成功后的确认

    1. import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.MessageProperties;
    5. import java.io.IOException;
    6. /**
    7. * @author rabbit
    8. * @version 1.0.0
    9. * @Description 1.普通 confirm 模式
    10. * @createTime 2022/07/27 19:34:00
    11. */
    12. public class NormalConfirmProducer {
    13. //生产者
    14. public static void main(String[] args) throws Exception {
    15. //1、获取connection
    16. Connection connection = RabbitCommonConfig.getConnection();
    17. //2、创建channel
    18. Channel channel = connection.createChannel();
    19. sendMsg(channel);
    20. //4、关闭管道和连接
    21. channel.close();
    22. connection.close();
    23. }
    24. private static void sendMsg(Channel channel) throws IOException, InterruptedException {
    25. //开启确认机制
    26. channel.confirmSelect();
    27. //3、发送消息到exchange
    28. String msg = "hello confirm";
    29. channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    30. if (channel.waitForConfirms()) {
    31. System.out.println("生产者发布消息至Exchange成功!");
    32. } else {
    33. System.out.println("生产者发布消息至Exchange失败!请重试");
    34. }
    35. }
    36. }

    2.1.2.2 批量Confirm方式

    针对一批消息的确认,虽然性能较高了,但控制就不是很精准了,使用时自己权衡。与单笔确认区别:就是控制确认的时机发生了变化,单笔确认就是一条一确认。

    1. import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.MessageProperties;
    5. import java.io.IOException;
    6. /**
    7. * @author rabbit
    8. * @version 1.0.0
    9. * @Description 2.批量 confirm 模式
    10. * @createTime 2022/07/27 19:34:00
    11. */
    12. public class BatchConfirmProducer {
    13. //生产者
    14. public static void main(String[] args) throws Exception {
    15. //1、获取connection
    16. Connection connection = RabbitCommonConfig.getConnection();
    17. //2、创建channel
    18. Channel channel = connection.createChannel();
    19. sendMsg(channel);
    20. //4、关闭管道和连接
    21. channel.close();
    22. connection.close();
    23. }
    24. private static void sendMsg(Channel channel) throws IOException, InterruptedException {
    25. //开启确认机制
    26. channel.confirmSelect();
    27. //3、发送消息到exchange
    28. String msg = "hello confirm";
    29. for (int i = 0; i < 10; i++) {
    30. channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
    31. }
    32. if (channel.waitForConfirms()) {
    33. System.out.println("生产者发布消息至Exchange成功!");
    34. } else {
    35. System.out.println("生产者发布消息至Exchange失败!请重试");
    36. }
    37. }
    38. }

    2.1.2.3 异步Confirm方式

    有的小伙伴会说,这还是慢啊,我发条消息还得等结果,发送性能难以保证,好,提供异步的方式,先往里面发,注册一个监听,靠异步返回的形式来确认消息的确发送成功了,若收到消息会有明确的成功失败,一直收不到监听返回,也可认为发送失败了

    1. import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.ConfirmListener;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.MessageProperties;
    6. import java.io.IOException;
    7. /**
    8. * @author rabbit
    9. * @version 1.0.0
    10. * @Description 3.异步confirm 模式
    11. * @createTime 2022/07/27 19:34:00
    12. */
    13. public class AsynConfirmProducer {
    14. //生产者
    15. public static void main(String[] args) throws Exception {
    16. //1、获取connection
    17. Connection connection = RabbitCommonConfig.getConnection();
    18. //2、创建channel
    19. Channel channel = connection.createChannel();
    20. sendMsg(channel);
    21. //4、关闭管道和连接
    22. channel.close();
    23. connection.close();
    24. }
    25. private static void sendMsg(Channel channel) throws IOException, InterruptedException {
    26. //开启确认机制
    27. channel.confirmSelect();
    28. //3、发送消息到exchange
    29. String msg = "hello confirm";
    30. for (int i = 0; i < 10; i++) {
    31. channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
    32. }
    33. //3.3、开启异步回调
    34. channel.addConfirmListener(new ConfirmListener() {
    35. @Override
    36. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    37. System.out.println("生产者发布消息至Exchange成功,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
    38. }
    39. @Override
    40. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    41. System.out.println("生产者发布消息至Exchange失败,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
    42. }
    43. });
    44. System.in.read();
    45. }
    46. }

    2.2 Broker内部过程

            2.2.1 Exchange发送至queue过程-Return机制

             好,保证了消息成功稳定的抵达了Exchange,那从Exchange路由到queue如何保证呢,请出 Return机制-采用Return机制来保证消息是否由exchange发送到了queue中。

    1. import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. /**
    5. * @author rabbit
    6. * @version 1.0.0
    7. * @Description 3.异步confirm 模式
    8. * @createTime 2022/07/27 19:34:00
    9. */
    10. public class AsynConfirmReturnProducer {
    11. //生产者
    12. public static void main(String[] args) throws Exception {
    13. //1、获取connection
    14. Connection connection = RabbitCommonConfig.getConnection();
    15. //2、创建channel
    16. Channel channel = connection.createChannel();
    17. sendMsg(channel);
    18. //4、关闭管道和连接
    19. channel.close();
    20. connection.close();
    21. }
    22. private static void sendMsg(Channel channel) throws IOException, InterruptedException {
    23. //开启确认机制
    24. channel.confirmSelect();
    25. //3、发送消息到exchange
    26. String msg = "hello confirm";
    27. for (int i = 0; i < 5; i++) {
    28. //3.2、发送消息,第三个设置为true才会有Return机制,默认为false
    29. //使用void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
    30. channel.basicPublish("", "no-lost", true, MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
    31. }
    32. //3.3、开启异步回调
    33. channel.addConfirmListener(new ConfirmListener() {
    34. @Override
    35. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    36. System.out.println("生产者发布消息至Exchange成功,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
    37. }
    38. @Override
    39. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    40. System.out.println("生产者发布消息至Exchange失败,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
    41. }
    42. });
    43. //3.1.2、开启Return机制
    44. channel.addReturnListener(new ReturnListener() {
    45. @Override
    46. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    47. //当送达失败是才会回调
    48. System.out.println(new String(body, "utf-8") + ",消息没有送达到queue中");
    49. }
    50. });
    51. System.in.read();
    52. }
    53. }

            2.2.2 queue存储过程

            好的,太不容易了,消息终于抵达了queue中,我们要保证消息不丢,需要将消息持久化,放内存中是不保险的, 这里有会有两个问题,

    第一 发送的消息是有属性的,需要设置 持久化标识。

    第二 创建的队列也是由属性的,也需要设置持久化, 

    这样才保证了消息的确持久化到了磁盘中。

    1. //deliveryMode =2 持久化消息
    2. channel.basicPublish("", "persist-show", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    3. /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
    4. public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    5. new BasicProperties("text/plain",
    6. null,
    7. null,
    8. 2,
    9. 0, null, null, null,
    10. null, null, null, null,
    11. null, null);
    12. //声明队列durable=true
    13. channel.queueDeclare("persist-show", true, false, false, null);
    14. Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    15. Map arguments) throws IOException;

    有的小伙伴会问,这样就会存储到磁盘么,如何证明?

    看面板:

     看磁盘存储

     文件内容:

    完整生产者代码:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 持久化
    5. * C:\Users\xxx\AppData\Roaming\RabbitMQ
    6. * @createTime 2022/07/27 19:34:00
    7. */
    8. public class PersistProducer {
    9. //生产者
    10. public static void main(String[] args) throws Exception {
    11. //1、获取connection
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. channel.queueDeclare("persist-show", true, false, false, null);
    16. for (int i = 0; i < Integer.MAX_VALUE; i++) {
    17. try {
    18. sendMsg(channel);
    19. } catch (Exception ex) {
    20. ex.printStackTrace();
    21. System.out.println("生产者发布消息失败!请重试");
    22. }
    23. Thread.sleep(1000);
    24. }
    25. //4、关闭管道和连接
    26. channel.close();
    27. connection.close();
    28. }
    29. private static void sendMsg(Channel channel) throws IOException, InterruptedException {
    30. //3、发送消息到exchange
    31. String msg = "i am a persist message";
    32. channel.basicPublish("", "persist-show", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    33. System.out.println("生产者发布消息成功");
    34. }
    35. }

    2.3 消费者消费过程-消费端确认

    太不容易了,整了半天终于到消费了,前面所有的努力都是为成功的消费呀,消费这里确保消费成功咯。要回到前面所讲过的 消费端的ACK了。【rabbitmq模式中-WORK模式提及】

    也就是开启消费端的手工确认机制-每次成功消费消息后,明确告知mq,消费成功了,你可以删除掉这条消息了。

    若果没告知mq,那这条消息就一直会处于unacked状态,等消费者重启会再次消费。

    所以消费完成后一定要明确告知 Broker,是 Ack ,还是NAck 还是Reject 了。

    2.3.1 消费确认异常再次放回队列执行,【可设定次数-N次之后就不要重试了-直接Ack了】

    1. import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
    2. import com.rabbitmq.client.*;
    3. import lombok.SneakyThrows;
    4. import java.io.IOException;
    5. import java.util.HashMap;
    6. import java.util.Map;
    7. /**
    8. * @author rabbit
    9. * @version 1.0.0
    10. * @Description 消息确认
    11. * @createTime 2022/07/27 19:36:00
    12. */
    13. public class ConfirmConsumer1 {
    14. private static final Map errorMap = new HashMap<>();
    15. //消费者
    16. public static void main(String[] args) throws Exception {
    17. //1、获取连对象、
    18. Connection connection = RabbitCommonConfig.getConnection();
    19. //2、创建channel
    20. Channel channel = connection.createChannel();
    21. //3、创建队列
    22. channel.queueDeclare("no-lost", true, false, false, null);
    23. //4.开启监听Queue
    24. DefaultConsumer consumer = new DefaultConsumer(channel) {
    25. @SneakyThrows
    26. @Override
    27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    28. System.out.println("接收到消息:" + new String(body, "UTF-8"));
    29. try {
    30. //具体业务
    31. int i = 1 / 0;
    32. //确认
    33. channel.basicAck(envelope.getDeliveryTag(), false);
    34. } catch (Exception e) {
    35. if (errorMap.get(new String(body, "UTF-8")) != null) {
    36. System.out.println("消息已重复处理失败,拒绝再次接收...");
    37. channel.basicReject(envelope.getDeliveryTag(), false);
    38. } else {
    39. System.out.println("消息即将再次返回队列处理...");
    40. channel.basicNack(envelope.getDeliveryTag(), false, true);
    41. errorMap.put(new String(body, "UTF-8"), 1);
    42. }
    43. }
    44. }
    45. };
    46. channel.basicConsume("no-lost", false, consumer);
    47. System.out.println("消费者开始监听队列");
    48. //5、键盘录入,让程序不结束!
    49. System.in.read();
    50. //6、释放资源
    51. channel.close();
    52. connection.close();
    53. }
    54. }

    2.3.2 消费确认异常后直接ACK,会再finally块,记录此次异常记录,后续和发送端结合再次发起重试

    1. import com.longfor.apidemos.rabbit.common.RabbitCommonConfig;
    2. import com.rabbitmq.client.*;
    3. import lombok.SneakyThrows;
    4. import java.io.IOException;
    5. import java.util.HashMap;
    6. import java.util.Map;
    7. /**
    8. * @author rabbit
    9. * @version 1.0.0
    10. * @Description 消息确认
    11. * @createTime 2022/07/27 19:36:00
    12. */
    13. public class ConfirmConsumer2 {
    14. //消费者
    15. public static void main(String[] args) throws Exception {
    16. //1、获取连对象、
    17. Connection connection = RabbitCommonConfig.getConnection();
    18. //2、创建channel
    19. Channel channel = connection.createChannel();
    20. //3、创建队列
    21. channel.queueDeclare("no-lost", true, false, false, null);
    22. //4.开启监听Queue
    23. DefaultConsumer consumer = new DefaultConsumer(channel) {
    24. @SneakyThrows
    25. @Override
    26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    27. System.out.println("接收到消息:" + new String(body, "UTF-8"));
    28. try {
    29. //具体业务
    30. int i = 1 / 0;
    31. } catch (Exception e) {
    32. e.printStackTrace();
    33. } finally {
    34. //确认
    35. channel.basicAck(envelope.getDeliveryTag(), false);
    36. }
    37. }
    38. };
    39. channel.basicConsume("no-lost", false, consumer);
    40. System.out.println("消费者开始监听队列");
    41. //5、键盘录入,让程序不结束!
    42. System.in.read();
    43. //6、释放资源
    44. channel.close();
    45. connection.close();
    46. }
    47. }

    2.3.3 消费异常后使用死信队列-再死信队列里单独处理

    死信队列后续单独介绍

    原理也很简单就是将未消费的消息,再次转发到了一个Exchange,Exchange再路由到对应的Queue中去而已。

    3.最佳实践

    至此,兄弟们,若想保证消息不丢失,我们需要做这么多事情,各种环节,保障各种异常情况。是不是头都大了。代码复杂度高,性能也没法保证。

    真正的实际应用中有各个环节都保证消息防丢的使用么?有的同学可以留言讨论下。 

    本人在实际应用中,包括看大部分的同学的代码,都不会做这么多的机制去保证消息防丢,最多是做一个消费端的ACK。

    甚至很多同学把MQ这里面的内容当做一个黑盒,或根本就不了解MQ里面的原理,只需要知道发送和接收,就能用起来,但发生了问题也必将麻烦。

    推荐方式:预警+补偿=最终一致性

    这里面我们引入一个实际的案例: 

    场景描述:发红包,领取红包的人若超过一定限额,将红包退回。【不用关注是否合理】

     那这里面我实际没关注,mq过程中任何保证消息丢失的措施,完全把mq黑盒化。

    最核心的内容是 设定了一个超退状态,消费完成后把这个状态置为已完成即可,

    若此状态在一定时间后【例如5分钟】还没发生变化,那我们会JOB 扫描到再次发起重试,做好幂等即可。

    仍补偿多次补偿不成功的,可以记录 超退失败,通过预警机制预警处理,人为干预即可。

    当然场景不同,运用方式不同,还是需要结合自己实际的业务,选择合适的方式,来保证消息的准确、稳定、及时的投递。

  • 相关阅读:
    HAL库笔记(重要库函数)
    一文弄懂 if __name__ == “__main__“:(洒洒水啦!)
    视频剪辑技巧:简单步骤,批量剪辑并随机分割视频
    spring-cloud-gateway启动失败以及springboo和springcloud版本对应关系总结
    基于Docker的JMeter分布式压测实战讲解
    初识Java 11-1 函数式编程
    数据挖掘——关联规则理论部分
    docker部署nginx+反向代理配置/代理宿主机网段服务器
    安达发|制造企业生产排产现状和APS系统的解决方案
    Git 保姆级使用教程
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127934520