• RabbitMQ之死信队列


    1. 死信队列概念

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

    死信来源:

    1. 消息 TTL 过期
    2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false,不回放到消息队列中)

    在这里插入图片描述


    2. 消息 TTL 过期

    生产者代码:

    package cn.edu.xd.five;
    
    import cn.edu.xd.util.RabbitMQUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer {
        private static final String NORMAL_EXCHANGE="normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel= RabbitMQUtils.getChannerl();
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //设置消息的TTL时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            for(int i=1;i<=10;i++){
                String msg="info"+i;
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes());
                System.out.println("生产者发送消息:"+msg);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    消费者1代码。消费者1代码中将正常交换机和死信交换机建立了联系,当正常队列中的消息无法被消费超时时,消息就会被移动到死信队列中

    package cn.edu.xd.five;
    
    import cn.edu.xd.util.RabbitMQUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer01 {
        private static final String NORMAL_EXCHANGE="normal_exchange";
        private static final String DEAD_EXCHANGE="dead_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel= RabbitMQUtils.getChannerl();
            //声明死信交换机和普通交换机 类型为 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            String deadQueue="dead_queue";
            channel.queueDeclare(deadQueue,false,false,false,null);
            channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
    
            //声明普通队列
            Map<String, Object> params = new HashMap<>();
            //正常队列设置死信交换机 参数 key 是固定值
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //正常队列设置死信 routing-key 参数 key 是固定值
            params.put("x-dead-letter-routing-key", "lisi");
            String normalQueue = "normal-queue";
            channel.queueDeclare(normalQueue, false, false, false, params);
            channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
    
    
            System.out.println("Consumer01等待接收消息...");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody());
                System.out.println("Consumer01从Q1接受到的消息:"+message);
            };
            //取消消费的一个回调接口 如在消费的时候队列被删除掉了
            CancelCallback cancelCallback = s -> {
                System.out.println("消息被中断");
    
            };
           channel.basicConsume(normalQueue,true,deliverCallback,cancelCallback);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    运行:先运行Consumer1, 然后将其关闭,此时打开web管理界面可以看到产生了两个队列,然后开启Consumer02, 然后再开启Producer, 等待10s钟,消息由正常队列移动到了死信队列中
    在这里插入图片描述
    在这里插入图片描述


    3. 队列达到最大长度

    生产者代码修改:
    在这里插入图片描述
    消费者1代码修改:
    在这里插入图片描述

    运行:启动消费者1,关闭,然后依次启动消费者2和生产者,从运行结果中可以看到消费者2从死信队列中消费了4条消息

    在这里插入图片描述


    4. 消息被拒

    生产者代码和3中一样
    修改生产者1代码,取消自动应答,并修改为拒绝消息info5, 同时取消3中设置的队列长度为6
    在这里插入图片描述
    在这里插入图片描述

    运行:启动消费者1,然后启动消费者2,然后启动生产者

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    Odoo16—权限控制
    信创优选,国产开源。Solon v2.5.11 发布
    Tomcat中间件版本信息泄露
    Spring源码分析之AOP
    PHP - 各版本对比 - 整理
    QT day4
    力扣16题 ~ 最接近的三数之和
    人工神经网络原理与实践,人工神经网络实战教程
    Nginx配置反向代理
    校招程序员无项目经验如何破局
  • 原文地址:https://blog.csdn.net/qq_43478694/article/details/125509433