• Spring-RabbitMQ 消费者消息确认案例实践


    Springboot 版本: 2.7.0

    消费者消息确认模式分类

    1. NONE:等同于rabbitMQ客户端的自动确认,只要投递了就认为是成功的。
    2. MANUAL:需要用户通过 channel 的 ack/nack 手动确认。
    3. AUTO(默认值):自动模式,消费者正常执行结束认为成功,报错认为失败。

    代码实现

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        public final static String TOPIC_EXCHANGE = "myExchange";
        public final static String QUEUE_NAME = "myQueue";
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(jsonConverter());
            template.setExchange(TOPIC_EXCHANGE);
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息:{}发送成功", correlationData.getId());
                } else {
                    log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
                }
            });
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
            });
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    
        @Bean
        public Jackson2JsonMessageConverter jsonConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    生产者:

    
    @Component
    public class PublisherService {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        public void send(){
    
            CorrelationData correlationData = new CorrelationData();
            rabbitTemplate.convertAndSend("my.test.message", new User("Kleven", 18), correlationData);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class User implements Serializable {
        private static final long serialVersionUID = -5079682733940745661L;
    
        private String name;
        private Integer age;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    模式一、NONE

    当确认模式设置为NONE时,只要中间件投递了消息就认为成功并将消息从队列中移除。

        @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter", ackMode = "NONE")
        public void noneAckListener(User user) {
            log.info("收到消息 -> {}", user);
            // 添加个错误用于测试
            int a = 1 / 0;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    结果:
    可以看到,即使消费者出错了,队列中的消息依然被删除了。Spring-RabbitMQ 消费者消息确认案例实践

    模式二、MANUAL

    channel.basicAck 确认一个或多个消息

    /**
    * @param deliveryTag 当前消息的投递标签,是一个自增的数字。
    * @param multiple true:确认 deliveryTag <= 当前消息deliveryTag 的所有消息; false:只确认当前收到的消息。
    */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
        @Autowired
        private Jackson2JsonMessageConverter jsonConverter;
    
        @RabbitListener(queues = "myQueue", ackMode = "MANUAL")
        public void manualAckListener(Message message, Channel channel) throws IOException {
    
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            log.info("成功消费消息 -> {}", jsonConverter.fromMessage(message));
    
            channel.basicAck(deliveryTag, false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结果:
    消息消费成功,且从队列中删除。

    消息:aaa9b3b7-85b4-42fb-8a12-0aad488817f1发送成功
    成功消费消息 -> User(name=Kleven, age=18)
    
    • 1
    • 2

    Spring-RabbitMQ 消费者消息确认案例实践

    channel.basicNack 拒绝一个或多个消息

        /**
         *
         * @param multiple 拒绝 deliveryTag <= 当前消息deliveryTag 的所有消息; false:只拒绝当前收到的消息。
         * @param requeue true 将拒绝对的消息重新加入队列。
         */
        void basicNack(long deliveryTag, boolean multiple, boolean requeue)
                throws IOException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
        @Autowired
        private Jackson2JsonMessageConverter jsonConverter;
    
        @RabbitListener(queues = "myQueue", ackMode = "MANUAL")
        public void manualAckListener(Message message, Channel channel) throws IOException {
    
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            log.info("消费消息 -> {}", jsonConverter.fromMessage(message));
    
            channel.basicNack(deliveryTag, false, true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结果:
    当 requeue 为 true时,拒绝消息后消息从重新入队,可以看到队列中任然有一条数据。
    当 requeue 为 false时,拒绝消息后消息也还是从队列中删除掉了。

    Spring-RabbitMQ 消费者消息确认案例实践

    模式三、AUTO

    默认值,消费者成功时认为成功并从队列中删除消息。消费者失败时认为失败,不会从队列中删除消息。

        @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter")
        public void autoAckListener(User user) {
            log.info("收到消息 -> {}", user);
            // 添加个错误用于测试
            int a = 1 / 0;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    结果:
    可以看到,消费者出错后,消息依然在队列中。当移除消费者中的错误代码后,成功消费消息后,队列中的数据被删除。
    Spring-RabbitMQ 消费者消息确认案例实践

  • 相关阅读:
    设计模式~解释器模式(Interpreter)-19
    百度地图、高德地图和腾讯地图定位不准确的解决方案
    可观测平台如何存储时序曲线?滴滴实践全历程分享
    Pinia(四)了解和使用getters
    测试架构师应该做和不应该做的事情
    Thinkphp漏洞详解合集
    「AI反诈与智能风控」闭门研讨会报名丨青源Workshop第26期
    Node.js知识点
    函数栈帧深度剖析(一篇带你牢牢掌握函数栈帧)
    拉线位移编码器要检查机械装置的安装状态
  • 原文地址:https://blog.csdn.net/u012359704/article/details/126381251