• RabbitMQ消息确认机制


    保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。
    在这里插入图片描述

    • publisher confirmCallback 确认模式
    • publisher returnCallback 未投递到 queue 退回模式
    • consumer ack机制

    生产端确认

    ConfirmCallback

    • 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback
    • CorrelationData:用来表示当前消息唯一性,例如使用UUID
    • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback
    • 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback

    在springboot的application配置文件中添加开启配置

    publisher-confirm-type: correlated
    
    • 1

    在配置类中添加初始化RabbitTemplate的方法

    	@Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void initRabbitTemplate() {
            /**
             * 设置ConfirmCallBack回调 produce -> broker
             * correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
             * ack 消息是否成功收到 只要消息抵达 ack就为true
             * cause 失败的原因
             */
            this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallBack...correlationData[{}]==>ack[{}]==>cause[{}]",correlationData,ack,cause);
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    ReturnCallback

    • confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式
    • 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据

    在springboot的application配置文件中添加开启配置

        publisher-returns: true
        # 优先回调returns
        template:
          mandatory: true
    
    • 1
    • 2
    • 3
    • 4

    在配置类中添加初始化RabbitTemplate的方法

       @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void initRabbitTemplate() {
        
            this.rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                /**
                 * message 投递失败的消息详细信息
                 * replayCode 回复的状态码
                 * replayText 回复的文本内容
                 * exchange 当时这个消息发给哪个交换机
                 * routingKey 当时这个消息用哪个路由键
                 */
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    log.info("ReturnsCallback...message[{}]==>replayCode[{}]==>replayText[{}]==>exchange[{}]==>routingKey[{}]",
                            returned.getMessage(),returned.getReplyCode(),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里插入图片描述

    服务端确认

    Ack

    问题:

    • 默认是自动确认的,只有消息接收到,客户端会自动确认,服务端就会移出这个消息。但是当接受了许多消息时,可能会在只处理了一条消息后宕机,这个时候会丢失其它未处理的消息

    解决方法:配置手动ack确认

    • 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
    • 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
    • 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户 端断开,消息不会被broker移除,会投递给别人

    在springboot的application配置文件中添加开启配置

        listener:
          direct:
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
  • 相关阅读:
    踩坑了!0作为除数,不一定会抛出异常!
    Windows bat 脚本注册系统服务教程详解
    pb:10个用一条语句写成的有关日期函数
    SpringMVC的执行流程
    行为型-中介者模式
    .NET 5 ORM 八大实用技巧 干货 - SqlSugar ORM
    大厂同事不会教你的git高级命令,今天我教你
    SAP 通过游标来分批从数据库表读取2G数据
    C 变量和类型
    【嵌入式Linux应用开发】温湿度监控系统——多线程与温湿度的获取显示
  • 原文地址:https://blog.csdn.net/weixin_52467834/article/details/126549864