• rabbitMQ学习-死信队列


    死信队列

    死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成死信,有死信自然就有死信队列。

    一般应用场景:为了保证订单业务中的消息数据不丢失,需要使用rabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入死信队列中,还有比如说:用户在商场下单成功并点击去支付后在指定时间未支付时自动失效。

    死信的来源

    • 消息TTL过期
    • 队列达到最大长度(队列满了,无法再添加数据到mq中)
    • 消息被拒绝(basic.reject或basic.nack)并且requeue= false;

    死信实战

    在这里插入图片描述

    消息TTL过期
    在这里插入图片描述

    /*
    死信队列
     之生产者代码
     */
    public class Product {
        //普通交换机的名称
        public static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
            //死信消息,设置TTL时间
            AMQP.BasicProperties properties =
                    new AMQP.BasicProperties()
                            .builder().expiration("10000").build();
    
            for (int i = 0; i < 11; i++) {
                String message = "info "+ i;
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    /*
    死信
        消费者
     */
    public class Consumer01 {
    
        //普通交换机的名称
        private static  final  String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机的名称
        private  static  final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列名称
        private static  final  String NORMAL_QUEUE = "normal_queue";
        //死信队列名称
        private static  final  String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
    
            //声明死信和普通交换机类型 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
    
            // 普通队列
            Map<String,Object> argument = new HashMap<>();
            //过期时间10秒过期   可以不设置,可以交给队列自动去
           // argument.put("x-message-ttl",100000);
            //正常队列设置死信交换机
            argument.put("x-dead-lettle-exchange",DEAD_EXCHANGE);
            //设置死信RoutingKey
            argument.put("x-dead-lettle-routing-key","lisi");
    
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);
    
            ///
            //声名死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机和普通队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
            //绑定死信的交换机与死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
            System.out.println("等待接受......");
    
            //绑定死信的交换机和死信的对列
            DeliverCallback deliverCallback = (consumer,message)->{
                System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
            };
    
            channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumer->{});
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    public class Consumer02 {
    
        //死信队列名称
        private static  final  String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
    
            System.out.println("等待接受......");
    
            //绑定死信的交换机和死信的对列
            DeliverCallback deliverCallback = (consumer,message)->{
                System.out.println("Consumer 02接受到消息是:" + new String(message.getBody(),"UTF-8"));
            };
    
            channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumer->{});
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    解决进不去死信队列的原因了,就是我没给他把过期时间值传进去,导致出现问题。

    队列达到最大长度
    消息生成者代码去掉TTL属性。

    public class Consumer01 {
    
        //普通交换机的名称
        private static  final  String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机的名称
        private  static  final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列名称
        private static  final  String NORMAL_QUEUE = "normal_queue";
        //死信队列名称
        private static  final  String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
    
            //声明死信和普通交换机类型 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
    
            // 普通队列
            Map<String,Object> argument = new HashMap<>();
            //过期时间10秒过期   可以不设置,可以交给队列自动去
           // argument.put("x-message-ttl",10000);
            //正常队列设置死信交换机
            argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信RoutingKey
            argument.put("x-dead-letter-routing-key","lisi");
    
            //设置正常队列的长度的限制
            argument.put("x-max-length",6);
    
    
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);
    
            ///
            //声名死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机和普通队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
            //绑定死信的交换机与死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
            System.out.println("等待接受......");
    
            //绑定死信的交换机和死信的对列
            DeliverCallback deliverCallback = (consumer,message)->{
                System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
            };
    
            channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumer->{});
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    public class Product {
        //普通交换机的名称
        public static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
            //死信消息,设置TTL时间 单位是ms
         //   AMQP.BasicProperties properties =
          //          new AMQP.BasicProperties()
                            //.builder().expiration("10000").build();
    
            for (int i = 0; i < 11; i++) {
                String message = "info "+ i;
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
                System.out.println("生产者发送消息:"+message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述
    消息被拒绝
    一旦消费者拒绝接受,就会成为死信队列。

    public class Product {
        //普通交换机的名称
        public static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
            //死信消息,设置TTL时间 单位是ms
          //演示长度  AMQP.BasicProperties properties =
             //   new AMQP.BasicProperties()
               //             .builder().expiration("10000").build();
    
            for (int i = 0; i < 11; i++) {
                String message = "info "+ i;                        //properties
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
                System.out.println("生产者发送消息:"+message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    public class Consumer01 {
    
        //普通交换机的名称
        private static  final  String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机的名称
        private  static  final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列名称
        private static  final  String NORMAL_QUEUE = "normal_queue";
        //死信队列名称
        private static  final  String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
    
            //声明死信和普通交换机类型 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
    
            // 普通队列
            Map<String,Object> argument = new HashMap<>();
            //过期时间10秒过期   可以不设置,可以交给队列自动去
           // argument.put("x-message-ttl",10000);
            //正常队列设置死信交换机
            argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信RoutingKey
            argument.put("x-dead-letter-routing-key","lisi");
    
            //设置正常队列的长度的限制
          //演示拒绝  argument.put("x-max-length",6);
    
    
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);
    
            ///
            //声名死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机和普通队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
            //绑定死信的交换机与死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
            System.out.println("等待接受......");
    
            //绑定死信的交换机和死信的对列
            DeliverCallback deliverCallback = (consumer,message)->{
           //拒绝消息在这里写
                String message1 = new String(message.getBody(),"UTF-8");
               //指定你要拒绝的消息
                if(message1.equals("info 5")){
                    System.out.println("此消息是被拒绝的"+ message1);
                    //拒接接收,且不放回队列,避免队列重新发送
                    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
                }else {
    
                    System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
                   //   接收且不放回
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
    
            };
            //开启手动应答,                      //true是自动应答。
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumer->{});
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    public class Consumer02 {
    
        private  static  final String DEAD_EXCHANGE = "dead_exchange";
    
        //死信队列名称
        private static  final  String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
            System.out.println("等待接受......");
    
            //绑定死信的交换机和死信的对列
            DeliverCallback deliverCallback = (consumer,message)->{
                System.out.println("Consumer 02接受到消息是:" + new String(message.getBody(),"UTF-8"));
            };
            channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumer->{});
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    【文件IO的简单实现】
    Android 需要动态申请的权限和普通权限
    ggplot2图形简单绘制
    怎么写出美观,可读性高的代码?
    C语言 之 多线程编程
    今日论文阅读2022-11-10
    JVM的原理与性能
    C++之浅拷贝、深拷贝、拷贝构造函数、拷贝赋值运算符、自定义的深拷贝函数应用总结(二百二十九)
    MySQL数据库 #2
    C++实现坦克大战(超详细)(文末附源码!!!)
  • 原文地址:https://blog.csdn.net/qq_45922256/article/details/127797219