• 消息队列—RabbitMQ如何保证消息可靠性?


    1. 如何保证消息的可靠性?

    先来看看我们的万年老图,从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。

    • 生产者发出后保证到达了MQ。
    • MQ收到消息保证分发到了消息对应的Exchange。
    • Exchange分发消息入队之后保证消息的持久性。
    • 消费者收到消息之后保证消息的正确消费。

    经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

    2. 生产者发送消息到MQ失败

    我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。

    为了解决这个问题,RabbitMQ引入了事务机制发送方确认机制(publisher confirm)

    • 方法一:生产者在发送数据之前开启RabbitMQ的事务(采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。)
    • 方法二:开启confirm模式(使用springboot时在application.yml配置文件中做如下配置,实现confirm回调接口,生产者发送消息时设置confirm回调)
    • 小结:事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。

    3. MQ接收失败或者路由失败

    生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:

    1. 消息找不到对应的Exchange。
    2. 找到了Exchange但是找不到对应的Queue。

    这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端

    我们既然要做可靠性,当然是设置为返回到客户端。


    配置:

    1. spring:
    2. rabbitmq:
    3. addresses: 127.0.0.1
    4. host: 5672
    5. username: guest
    6. password: guest
    7. virtual-host: /
    8. # 打开消息确认机制
    9. publisher-confirm-type: correlated
    10. # 打开消息返回
    11. publisher-returns: true
    12. template:
    13. mandatory: true

    我们只需要在配置里面打开消息返回即可,template.mandatory: true这一步不要少~

    生产者:

    1. public void sendAndReturn() {
    2. User user = new User();
    3. log.info("Message content : " + user);
    4. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    5. log.info("被退回的消息为:{}", message);
    6. log.info("replyCode:{}", replyCode);
    7. log.info("replyText:{}", replyText);
    8. log.info("exchange:{}", exchange);
    9. log.info("routingKey:{}", routingKey);
    10. });
    11. rabbitTemplate.convertAndSend("fail",user);
    12. log.info("消息发送完毕。");
    13. }

    这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。

    4. 消息入队之后MQ宕机

    到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。

    消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

    1. @Bean
    2. public DirectExchange directExchange() {
    3. // 三个构造参数:name durable autoDelete
    4. return new DirectExchange("directExchange", false, false);
    5. }
    6. @Bean
    7. public Queue erduo() {
    8. // 其三个参数:durable exclusive autoDelete
    9. // 一般只设置一下持久化即可
    10. return new Queue("erduo",true);
    11. }

    创建Exchange和队列时只要设置好持久化,发送的消息默认就是持久化消息。

    设置持久化时一定要将Exchange和队列都设置上持久化:

    单单只设置Exchange持久化,重启之后队列会丢失。单单只设置队列的持久化,重启之后Exchange会消失,既而消息也丢失,所以如果不两个一块设置持久化将毫无意义。

    Tip: 这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。

    5. 消费者无法正常消费

    最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。

    1. spring:
    2. rabbitmq:
    3. addresses: 127.0.0.1
    4. host: 5672
    5. username: guest
    6. password: guest
    7. virtual-host: /
    8. # 手动确认消息
    9. listener:
    10. simple:
    11. acknowledge-mode: manual

    打开手动消息确认(手动ACK)之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。

    当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。

    所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。

    幂等性相关内容不在本章讨论范围~所以我就不多做阐述了。

    6. 消息可靠性案例

    这个例子中的消息是先入库的,然后生产者从DB里面拿到数据包装成消息发给MQ,经过消费者消费之后对DB数据的状态进行更改,然后重新入库。

    这中间有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时任务不停的去刷库,找到有问题的数据将它重新扔到生产者那里进行重新投递。

    这个方案其实和网上的很多方案大同小异,基础的可靠性保证之后,定时任务做一个兜底进行不断的扫描,力图100%可靠性。

  • 相关阅读:
    mnist手写数字识别,dnn实现代码解读
    Linux软件包管理— yum命令
    internship:熟悉项目代码的几个步骤
    Linux下lsof命令使用
    AnolisOS升级SSH,不升级SSL
    数据结构与算法课后题-第二章(顺序表)
    十年数据库专家,耗尽3年心血整理的Mycat中间实战笔记,绝对干货
    华为智慧屏 招一招即可分享运动状态,搜索运动教程,同时还可通过手机操控智慧屏
    物联网开发笔记(49)- 使用Micropython开发ESP32开发板之控制RGB全彩LED模块
    2020年6月编程Scratch一级
  • 原文地址:https://blog.csdn.net/weixin_62079735/article/details/136838996