• (十)死信队列


    1、概念

    某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有
    后续的处理,就变成了死信,有死信自然就有了死信队列

    **应用场景:**为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效,这种就放入死信队列进行处理。

    2、死信产生的原因

    • 消息 TTL(Time to Live存活时间) 过期
    • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

    3、代码实现

    3.1. 流程图

    在这里插入图片描述

    3.2. 消息TTL 过期

    消费者1 (最复杂的)

    package com.feng.deadQueue;
    
    import com.feng.utils.RabbitMQUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Delivery;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author Feng
     * @Date 2022/11/25 15:09
     * @Version 1.0
     * @Description 死信队列消费者01
     */
    public class Consumer01 {
    
        //普通交换机名字
        public static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机名字
        public static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列名
        public static final String NORMAL_QUEUE = "normal_queue";
        //死信队列名
        public static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtil.getChannel();
            //声明交换机:普通和死信
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            /**
             *  声明普通队列
             */
            //设置声明队列的参数
            Map<String, Object> arguments = new HashMap<>();
            //设置过期时间,单位是毫秒,这里设置10秒
            arguments.put("x-message-ttl",10000);
            //设置队列的死信交换机
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key","lisi");
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机与队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
            //绑定死信换机与死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
            channel.basicConsume(NORMAL_QUEUE,true,(String consumerTag, Delivery message) -> {
                System.out.println("消费者1接收消息是:" + new String(message.getBody(), "UTF-8"));
                System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
            }, consumerTag -> {});
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    生产者

    package com.feng.deadQueue;
    
    import com.feng.utils.RabbitMQUtil;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author Feng
     * @Date 2022/11/25 15:37
     * @Version 1.0
     * @Description 死信队列生产者
     */
    public class ProductDead {
        //普通交换机名字
        public static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtil.getChannel();
            //死信消息,设置ttl时间
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                    .builder()
                    .expiration("10000")//单位是ms
                    .build();
            for (int i = 1; i < 11; i++) {
                String msg = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    只需要启动消费者01再宕机,然后启动生产者发消息,消息内容就会从普通队列到死信队列

    在这里插入图片描述
    在这里插入图片描述

    死信队列消费者

    package com.feng.deadQueue;
    
    import com.feng.utils.RabbitMQUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Delivery;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author Feng
     * @Date 2022/11/25 15:09
     * @Version 1.0
     * @Description 死信队列消费者02
     */
    public class Consumer02 {
        //死信交换机名字
        public static final String DEAD_EXCHANGE = "dead_exchange";
        //死信队列名
        public static final String DEAD_QUEUE = "dead_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtil.getChannel();
            channel.basicConsume(DEAD_QUEUE,true,(String consumerTag, Delivery message) -> {
                System.out.println("消费者1接收消息是:" + new String(message.getBody(), "UTF-8"));
                System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
            }, consumerTag -> {});
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    3.3. 队列达到最大长度

    声明队列的时候设置参数就行如下

    //设置队列长度限制,这里是6
    arguments.put("x-max-length",6);
    
    • 1
    • 2

    注意此时需要把原先队列删除 因为参数改变了

    在这里插入图片描述

    发十条消息测试如下

    在这里插入图片描述
    这里注意超出队列长度进入死信队列的消息是先入队的消息

    3.4. 消息被拒

    • 消息生产者代码同上生产者一致
    • C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息。让消息阻塞在队列中)
    package com.feng.deadQueue;
    
    import com.feng.utils.RabbitMQUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Delivery;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author Feng
     * @Date 2022/11/25 15:09
     * @Version 1.0
     * @Description 死信队列消费者01
     */
    public class Consumer01 {
    
        //普通交换机名字
        public static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机名字
        public static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列名
        public static final String NORMAL_QUEUE = "normal_queue";
        //死信队列名
        public static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtil.getChannel();
            //声明交换机:普通和死信
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            /**
             *  声明普通队列
             */
            //设置声明队列的参数
            Map<String, Object> arguments = new HashMap<>();
            //设置过期时间,单位是毫秒,这里设置10秒
    //        arguments.put("x-message-ttl",10000);
            //设置队列的死信交换机
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key","lisi");
            //设置队列长度限制
    //        arguments.put("x-max-length",6);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机与队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
            //绑定死信换机与死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
            //开启手动应答
            channel.basicConsume(NORMAL_QUEUE,false,(String consumerTag, Delivery message) -> {
                String msg = new String(message.getBody(), "UTF-8");
                if("info5".equals(msg)){
                    System.out.println("消费者1接收消息是:" + msg+"这个是拒绝消息");
                    //第二个参数是不重新入队
                    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
                }
                System.out.println("消费者1接收消息是:" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }, consumerTag -> {});
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
  • 相关阅读:
    Jenkins 配置从节点
    CTFshow web48 49 50 51 52
    Exception in thread “AWT-EventQueue-0“ java.lang.NullPointerException
    Windows OpenGL 图像褐色
    java 实习面经 —— 含大厂
    Python学习笔记14:进阶篇(三)。类的终结篇,类的导入和模块的导入。
    js的常用的设计模式例子
    Three.js相机模拟
    邻接表转化为逆邻接表
    学点设计模式,盘点Spring等源码中与设计模式的那些事之创建型模式
  • 原文地址:https://blog.csdn.net/m0_51295655/article/details/128038099