• RabbitMQ:死信队列



    📃个人主页:不断前进的皮卡丘
    🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
    🔥个人专栏:消息中间件

    1.死信队列

    1.1死信队列基本介绍

    • 队列中不能被消费的消息称为死信队列
    • 有时候因为特殊原因,可能导致队列中的某些信息无法被消费,而队列中这些不能被消费的消息在后期没有进行处理,就会变成死信队列,死信队列中的消息称为死信
    • 应用场景:未来保证订单业务的消息数据不丢失,我们需要使用到RabbitMQ的死信队列机制,当消息消费发生异常的时候,我们就把消息投入到死信队列中,比如说用户买东西,下单成功后去支付,但是没有在指定时间支付的时候就会自动失效。
    • 死信队列,英文缩写:DLX 。DeadLetter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
    • 当消息在一个队列中变成死信后,它能被重新发布到另一个Exchange中,这个Exchange就是DLX
    • 在这里插入图片描述

    1.2消息成为死信的三种情况

    1. 队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
    2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
    3. 原队列存在消息过期设置,消息到达超时时间未被消费。

    1.3死信队列结构图

    通常情况下,消费者是能正常消费消息的,但是出现上面说的三种情况之一,就无法正常消费信息,消息就会进入死信交换机,死信交换机会和死信队列进行绑定,最后由其他消费者来消费死信消息。

    在这里插入图片描述

    1.4死信的处理方式

    死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
    ① 丢弃,如果不是很重要,可以选择丢弃
    ② 记录死信入库,然后做后续的业务分析或处理
    ③ 通过死信队列,由负责监听死信的应用程序进行处理
    综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理。
    队列绑定死信交换机:
    给队列设置参数:x-dead-letter-exchange 和x-dead-letter-routing-key
    在这里插入图片描述

    2.TTL消息过期时间

    2.1基本介绍

    当消息到达存活时间后,还没有被消费,就会被自动清除。RabbitMQ可以对消息或者队列设置过期时间,队列中的消息过期是成为死信队列的三种原因之一。
    在这里插入图片描述

    2.2生产者

    public class Producer {
        //正常交换机
        public static final String NORMAL_EXCHANGE = "normal_exchange";
        //正常队列
        public static final String NORMAL_QUEUE = "normal_queue";
    
        public static void main(String[] args) {
            try {
                Channel channel = ConnectUtil.getChannel();
                //声明交换机
                channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
                //声明队列
                //channel.queueDeclare(NORMAL_QUEUE, true, false, false, null);
                //把正常交换机和正常队列进行绑定
                //channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "tom");
                //设置过期时间
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    
                //发送消息
                for (int i = 0; i < 10; i++) {
                    String message = "消息:" + i;
                    //发送消息
                    channel.basicPublish(NORMAL_EXCHANGE, "tom", null, message.getBytes());
    
                }
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    2.3消费者1

    public class Consumer1 {
        //定义交换机(正常交换机,死信交换机)
        public static final String NORMAL_EXCHANGE = "normal_exchange";
        public static final String DEAD_EXCHANGE = "dead_exchange";
        //定义队列(正常队列,死信队列)
        public static final String NORMAL_QUEUE = "normal_queue";
        public static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) {
    
            try {
                //创建信道对象
                Channel channel = ConnectUtil.getChannel();
                //声明交换机
                channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
                channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
    
                //设置正常队列和死信队列进行绑定,key固定不可以改变
               Map<String, Object> map = new HashMap<>();
                map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
                map.put("x-dead-letter-routing-key", "jack");
                //声明正常队列
                channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
                //正常交换机绑定正常队列
                channel.queueBind(NORMAL_QUEUE,NORMAL_QUEUE,"tom");
                //声明死信队列
                channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
                //死信交换机绑定死信队列
                channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"jack");
                //消费消息
    
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                     * 消费回调函数,当收到消息以后,会自动执行这个方法
                     * @param consumerTag 消费者标识
                     * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                     * @param properties   属性信息
                     * @param body         消息数据
                     * @throws IOException
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
                    }
                };
                //监听消息(队列名称,是否自动确认消息,消费对象)
                channel.basicConsume(NORMAL_QUEUE, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    2.4消费者2

    public class Consumer2 {
        //定义交换机(死信交换机)
    
        public static final String DEAD_EXCHANGE = "dead_exchange";
        //定义队列(死信队列)
        public static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) {
    
            try {
                //创建信道对象
                Channel channel = ConnectUtil.getChannel();
                //声明死信队列
                channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
                //死信交换机绑定死信队列
                channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "jack");
    
                //消费消息
    
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                     * 消费回调函数,当收到消息以后,会自动执行这个方法
                     * @param consumerTag 消费者标识
                     * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                     * @param properties   属性信息
                     * @param body         消息数据
                     * @throws IOException
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
                    }
                };
                //监听消息(队列名称,是否自动确认消息,消费对象)
                channel.basicConsume(DEAD_QUEUE, true, consumer);
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    2.5设置TTL的两种方式

    2.5.1队列设置TTL

    在创建队列的时候设置队列的x-message-ttl属性,例如:

      Map<String, Object> map = new HashMap<>();
    //设置队列有效期为10秒
    map.put("x-message-ttl",10000);
    channel.queueDeclare(queueName,durable,exclusive,autoDelete,map);
    
    • 1
    • 2
    • 3
    • 4

    2.5.2消息设置TTL

    对每条消息设置TTL

      AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
     channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes());
    
    • 1
    • 2

    2.5.3区别

    如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃
    如果是消息设置了TTL属性,那么即使消息过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,那么已经过期的消息也许还能存活较长时间。

    如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃。

  • 相关阅读:
    jadx 反编译apk
    js小数计算丢失精度问题
    java计算机毕业设计基于springboot 口腔卫生防护口腔牙科诊所管理系统
    python图片合成
    深度学习21天——循环神经网络(RNN)实现股票预测 |(第9天)
    qt单例模式
    疫情可视化(后续)
    Mybatis中Resources和ClassLoaderWrapper
    消费电子 SIC462ED SIC463ED DC/DC 稳压器 参数 应用
    【Axure高保真原型】自适应多行输入框
  • 原文地址:https://blog.csdn.net/qq_52797170/article/details/127282842