• 消息队列-RabbitMQ-消息确认机制


    为了保证消息的不丢失,可靠抵达,可以使用事务消息,但是性能会下降250倍,为此引入确认机制。

    在这里插入图片描述

    1.ConfirmCallBack

    服务器收到消息就回调
    ● 被broker接收到只能表示Message已经到达服务器,并不能保证消息一定会投递到目标queue里面
    ● 消息只要被broker接收就会执行confirmCallBack(定制RabbitTemplate),如果是cluster模式,需要所有的broker都接收到才会调用confirmCallBack
    ● 在配置文件中加入:

    spring.rabbitmq.publisher-confirms=true
    

    在创建 connectionFactory 的时候设置 PublisherConfirms (true) 选项,开启 ConfirmCallback ,因此我们可以定制 RabbitTemplate ,在 config 包下,创建 MyRabbitConfig 配置类,设置确认回调,需要编写如下方法

    //设置确认回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    
        /**
         *
         * 1、只要消息抵达Broker就ack=true
         * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
         * @param ack  消息是否成功收到
         * @param cause 失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
            /**
             * 1、做好消息确认机制(pulisher,consumer【手动ack】)
             * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
             */
            //服务器收到了;
            //修改消息的状态
            System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
        }
    });
    

    2. ReturnCallback

    消息抵达队列进行回调

    1、在 application.properties 中进行配置:

    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    

    2、设置确认回调ReturnCallback

    //设置消息抵达队列的确认回调
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调(只有失败才会回调???)
         * @param message   投递失败的消息详细信息
         * @param replyCode 回复的状态码
         * @param replyText 回复的文本内容
         * @param exchange  当时这个消息发给哪个交换机
         * @param routingKey 当时这个消息用哪个路由键
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            //报错误了。修改数据库当前消息的状态->错误。
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]===>exchange["+exchange+"]===>routingKey["+routingKey+"]");
        }
    });
    

    3.Ack消息确认机制

    • 消费者获取到消息,成功处理,可以回复 Ack 给 Broker
    • basic.ack 用于肯定确认,broker 将移除此消息
    • basic.nack 用于否定确认,可以指定 broker 是否丢弃此消息,可以批量
    • basic.reject 用于否定确认,同上,但不能批量
    • 默认自动ack,消息被消费者收到,就会从broker的queue中移除
  • 相关阅读:
    通关剑指 Offer——剑指 Offer II 013. 二维子矩阵的和
    (黑马出品_03)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
    1 操作系统任务调度问题----来源刘H同学
    数据结构-堆的实现及应用(堆排序和TOP-K问题)
    增强现实(AR)开发框架
    Go语言 接口与类型
    比特币的蒙提霍尔问题
    leetcode: 二叉树的层序遍历
    未来智能汽车数字化转型之路必不可少强大的高级排产软件
    数据挖掘算法原理与实践:数据预处理
  • 原文地址:https://blog.csdn.net/Nanki_/article/details/139723522