• RabbitMQ系列【10】死信队列


    有道无术,术尚可求,有术无道,止于术。

    概念

    无法被消费的消息被称为死信,存放死信的队列也就是死信队列

    由于某些特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信

    例如消费消息时,发生异常,经过一定次数的重试后,该消息依然无法被正常消费,此时可以将该消息放入死信队列中,后续进行人工干预。

    消息成为死信的三种情况:

    1. 队列消息长度到达限制
    2. 消费者异常拒接消费消息
    3. 原队列存在消息过期设置,消息到达超时时间未被消费

    一个简单的死信处理流程图如下:
    在这里插入图片描述
    流程图说明:

    1. 生产者投递消息,消费者监听队列消息
    2. 产生死信消息时,投递到死信队列
    3. 死信消费者消费死信消息,存入到数据库,并进行人工干预处理

    创建死信交换机、队列

    死信交换机、死信队列需要我们自己创建,只是业务中用来存放死信的“特殊”交换机队列。其他队列可以指定死信交换机、死信队列,当发生死信时,自动将其投递到死信队列中。

    创建死信交换机、死信队列

    @Configuration
    public class RabbitMqDeadQueueConfig {
    
        private static final String DEAD_QUEUE = "deadQueue";
    
        private static final String DEAD_EXCHANGE = "deadExchange";
    
        private static final String DEAD_ROUTE_KEY = "dead.key";
    
        /**
         * 死信队列
         */
        @Bean(DEAD_QUEUE)
        public Queue deadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        /**
         * 死信交换机
         */
        @Bean(DEAD_EXCHANGE)
        public Exchange deadExchange() {
            return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
        }
    
        /**
         * 创建死信队列和死信交换机的绑定关系
         */
        @Bean("deadBinding")
        public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
            return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    创建正常的业务消费队列,并指定指定死信交换机、路由KEY

    @Configuration
    public class RabbitMqConfig {
    
        private static final String DEAD_QUEUE = "deadQueue";
    
        private static final String DEAD_EXCHANGE = "deadExchange";
    
        private static final String DEAD_ROUTE_KEY = "dead.key";
    
        /**
         * 使用 ExchangeBuilder 创建交换机
         */
        @Bean("bootExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
        }
    
        /**
         * 创建队列:
         * 指定死信交换机、路由KEY
         */
        @Bean("bootQueue")
        public Queue bootQueue001() {
            return QueueBuilder.durable("bootQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    创建一个死信消费者,消费死信,存入到数据库,等待人工干预处理:

        @RabbitListener(queues = {"deadQueue"})
        public void receiveMessage001(Message message) {
            System.out.println("收到死信消息" + new String(message.getBody()));
            System.out.println("存入数据库,等待人工干预");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    过期导致死信

    首先模拟过期导致死信,注释正常业务消费者代码,发送一条TTL 消息:

            // 1. 消息过期
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("10000");
            Message message = new Message("HELLO TTL".getBytes(), messageProperties);
            rabbitTemplate.send("bootExchange", "boot.key", message);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    到了过期时间后,由于没有消费者消费该消息,成为死信,最终被死信消费者接收到:

    在这里插入图片描述

    拒接消费

    在之前,我们介绍过消息确认ACK机制,当拒收消息时,也可以放入死信队列中。

    参考上面的文档,开启手动确认模式,可以看到拒接消息后,也会存放到死信队列中:
    在这里插入图片描述

    长度限制

    创建队列,指定长度为1,当队列中的消息已经达到了这个最大长度限制时,再次投递,消息将被挤掉,被挤掉的会进入死信队列。

        /**
         * 创建队列:
         * 指定死信交换机、路由KEY、数量限制
         */
        @Bean("bootQueue")
        public Queue bootQueue() {
            return QueueBuilder.durable("bootQueue").maxLength(1).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    批量发送一百条消息:

            Message message = new Message("HELLO WORLD".getBytes(StandardCharsets.UTF_8));
            for (int i = 0; i < 100; i++) {
                rabbitTemplate.send("bootExchange", "boot.key", message);
            }
    
    • 1
    • 2
    • 3
    • 4

    关闭业务监听消费者,运行发送消息,可以看到大量消息进入死信队列:
    在这里插入图片描述
    查看控制台,只有第一条消息被保存在队列中:
    在这里插入图片描述

  • 相关阅读:
    1583 - Digit Generator (UVA)
    Pytorch模型转ONNX部署
    DVWA系列4:XSS 跨站脚本攻击之 DOM型 和 反射型
    (16)UiBot:智能化软件机器人(以头歌抓取课程数据为例)
    【CHI】CHI协议,transaction事务汇总
    redis
    C语言const修饰指针变量
    ARC123E Training
    Haproxy负载均衡集群
    [LeetCode周赛复盘] 第 318 场周赛20221107
  • 原文地址:https://blog.csdn.net/qq_43437874/article/details/127779461