• RabbitMQ 保证消息可靠性


    消息确认机制逻辑

    1、消息抵达MQ服务器,会触发ConfirmCallback这个方法.
    2、消息从交换机抵达对应的队列,成功不会触发,失败了会触发ReturnCallback这个方法.
    3、消费者消费了消息之后,需要执行签收动作,也就是使用 basicAck() 手动签收,之后这个消息会从服务器删掉。

    在这里插入图片描述

    修改配置

    1、发送端确认回调 生产者到broker服务端 
    # 消息抵达服务器 就会触发回调
    spring.rabbitmq.publisher-confirms=true
    2、发送端确认回调 exchange 到 queue 投递成功不会触发,失败才会触发
    # 开启发送端消息抵达队列确认机制 exchange 到 queue
    spring.rabbitmq.publisher-returns=true
    # 如果消息抵达了队列 就会优先执行这个回调  以异步的模式
    spring.rabbitmq.template.mandatory=true
    3、消费端确认机制
    # 开启手动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    代码实现

    @Slf4j
    @Configuration
    public class MyRabbitConfig {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 自定义序列化方式 将字节码换成json
         *
         * @return
         */
        @Bean
        public MessageConverter converter() {
            return new Jackson2JsonMessageConverter();
        }
    
        // MyRabbitConfig对象创建完成之后,执行这个方法
        // Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
        @PostConstruct
        public void initRabbit() {
    
            // 设置发送端确认回调 生产者到broker服务端
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * 设置发送端确认回调 生产者到broker服务端
                 * 只要消息抵达服务器 就会触发回调  spring.rabbitmq.publisher-confirms=true
                 * @param correlationData 当前消息的唯一ID
                 * @param ack 消息是否成功或失败 true/false
                 * @param cause 失败的原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("消息抵达服务端:confirm:correlationData:{}", correlationData);
                    log.info("消息抵达服务端:confirm:ack:{}", ack);
                    log.info("消息抵达服务端:confirm:cause:{}", cause);
    
                }
            });
    
            // 设置发送端确认回调 exchange 到 queue 投递成功不会触发,失败才会触发
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * 消息抵达队列触发回调 exchange 到 queue ,只有消息没有到达队列才会触发这个回调,类似失败回调
                 * # 开启发送端消息抵达队列确认机制 exchange 到 queue
                 * spring.rabbitmq.publisher-returns=true
                 * # 如果消息抵达了队列 就会优先执行这个回调  以异步的模式
                 * spring.rabbitmq.template.mandatory=true
                 * @param message 投递失败消息的详细内容
                 * @param replyCode 回复的状态码
                 * @param replyText 回复的文本内容
                 * @param exchange 当时这个消息发送给哪个交换机
                 * @param routingKey 当时这个消息指定的路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.info("失败的消息:returnedMessage:message:{}", message);
                    log.info("失败的消息:returnedMessage:replyCode:{}", replyCode);
                    log.info("失败的消息:returnedMessage:replyText:{}", replyText);
                    log.info("失败的消息:returnedMessage:exchange:{}", exchange);
                    log.info("失败的消息:returnedMessage:routingKey:{}", routingKey);
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
             // 消费端确认机制 保证每一个消息被正确消费 此时才能broker删除这个消息
            // # 手动确认
            // spring.rabbitmq.listener.simple.acknowledge-mode=manual
            // 默认是自动确认的 消费端只要消息接收到,消费端会自动确认,服务端就会移除这个消息
            // 问题:收到很多消息,自动回复给服务器ACK 只有一个消息处理成功,服务宕机了,发生了消息丢失,原因是自动确认了
            // 手动确认:只要没有明确告诉MQ,消息没有被签收,没有ACK MQ就会一直存在 UACK状态,即使服务器宕机消息也不会丢失,会重新变为Ready状态,下次
            //         有新的消费者就会把消息发送给他,保证消费者消息不丢失。
            //         basicAck() 手动签收 业务成功完成
            //         basicNack() 拒签 业务失败 拒签
    
    
    @RabbitHandler
    public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        System.out.println(message);
        try {
            channel.basicAck(tag,false);            // 确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    如何使用API接口获取商品数据,从申请API接口、使用API接口到实际应用,一一讲解
    南卡Pro4骨传导耳机!新一代「響」科技,音质降漏“双冠军”采用自研骨振子技术
    Databricks 入门之sql(二)常用函数
    SSL证书安装教程
    小节9:Python之numpy
    C++优先级队列
    《心流》摘录与笔记
    Opencv | 二值化操作
    SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.15 SpringBoot 整合 task
    c++:蓝桥杯中的基础算法1(枚举,双指针)
  • 原文地址:https://blog.csdn.net/weixin_43705313/article/details/126380367