• RabbitMQ------死信队列(消息超时、达到最大长度、消费拒绝)(六)


    RabbitMQ------死信队列(六)

    死信的概念

    死信:无法被消费的消息,一般情况下:生产者将消息投递到broker或者直接到queue中,消费者从queue取出消息进行消费,但是某些时候,由于特定原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信消息,有了死信消息就产生了死信队列
    当消息消费发生异常时,将消息投入死信队列中。比如:用户在商城下单成功并点击去支付后,在指定时间未支付时自动失效。

    死信的来源

    1.消息TTL(存活时间)过期
    2.队列达到最大长度(队列满了,无法再添加数据到mq中)
    3.消息被拒绝(basic.reject或basic.nack)并且requeue = false(不放回队列中)
    在这里插入图片描述
    示例代码:
    消费者1是最为重要的,需要声明交换机1,队列1,以及消费者2,队列2,并且需要当消息异常时,将死信消息转发到交换机2,再由交换机2转发到队列2.
    消费者2只需要消费队列2的消息。
    生产者需要向交换机1发送消息。
    消费者1:
    和以前不一样,这里需要在队列1,就是普通队列中,添加对死信的操作,因此为arguments参数进行了定义。

    /**
     * 死信
     * 消费者01
     */
    public class Consumer01 {
        //正常交换机
        public static  final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        public static  final String DEAD_EXCHANGE = "dead_exchange";
        //正常队列
        public static  final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        public static  final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //声明普通交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //声明死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //声明普通队列
            /**
             * 普通队列需要指定对应的参数
             * 才能够当消息成为死信后
             * 转发到死信交换机
             * 再发送到死信队列
             * arguments
             */
            Map<String, Object> arguments = new HashMap<>();
            //过期时间 10s  单位毫秒
            //也可以在生产者发送时,指定过期时间,一般使用生产者发送时指定
    //        arguments.put("x-message-ttl",10000);
            //正常队列过期后设置死信交换机 x-dead-letter-exchange 固定格式
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信交换机的routingkey
            arguments.put("x-dead-letter-routing-key","roukey2");
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            //绑定普通交换机与队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"roukey1");
    
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            //绑定死信交换机与死信队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"roukey2");
    
    
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("消费者01接收到的消息"+new String(message.getBody(),"UTF-8"));
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    生产者:
    生产者同样做了变动,设置了消息变为死信的过期时间,并且作为参数传递了进去。

    public class ProducerLog {
        public static  final String EXCHANGE_NAME = "normal_exchange";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //设置过期时间,变为死信 10s   单位毫秒
            AMQP.BasicProperties  properties = new AMQP.BasicProperties().
                    builder().expiration("10000").
                    build();
            for (int i = 0; i < 10; i++) {
                String message = "info:"+i;
                channel.basicPublish(EXCHANGE_NAME,"roukey1",properties,message.getBytes("UTF-8"));
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    消费者2
    只消费死信队列中的消息

    public class Consumer02 {
        //死信队列
        public static  final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("消费者02接收到的消息"+new String(message.getBody(),"UTF-8"));
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    消息达到过期时间未被消费,变为死信,再被消费者2消费的场景

    1.先启动消费者1,创建正常队列、死信队列、正常交换机、死信交换机
    2.再关闭消费者1,交换机和队列创建后,依然存活,模拟消费者1,未能及时消费
    3.启动生产者,生产者发送消息值正常交换机
    4.现象:正常队列内信息条数为10,过了10秒后,变为0,死信队列消息条数由0变为10.
    5.启动消费者2,死信队列消息被消费。

    达到最大队列长度场景

    需要在消费1,声明正常队列时,加入如下参数即可
    设置最大长度为6,超过6个,多余的就为死信,进入死信队列。
    生产者删除,最大存活时间的设置。
    注,之前队列已经创建,属性改变,之前队列需要删除

            //设置正常队列长度的限制
            arguments.put("x-max—letter",6);
    
    • 1
    • 2

    消息被拒绝场景

    需要在消费者1接收消息的回调函数中,进行改造,增加拒绝操作

            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                String mes = new String(message.getBody(), "UTF-8");
                if (mes.equals("info:5")){
                    System.out.println("消费者01拒绝消息"+mes);
                    /**
                     * 拒绝消息
                     * 消息的标签
                     * 是否放回原队列
                     */
                    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
                }else {
                    /**
                     * 应答消息
                     * 消息的标签
                     * 是否批量应答
                     */
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    //消息有消息头、消息体
                    System.out.println("消费者01接收到的消息"+mes);
                }
            };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    云原生周刊:CNCF 宣布 Cilium 毕业 | 2023.10.16
    GO sync.Map Store、Delete 、Load 、Range等方法使用举例
    兴趣社如何搭建一个兴趣社区?
    Flask vs. Django:选择适合你的Web开发框架【第134篇—Flask vs. Django】
    Thrift : Python RPC的实践
    算法设计_综合练习_编程题
    NetSuite 关闭期间的销售订单可否修改
    通过Patch-Base来优化VSR中的时间冗余
    什么是IOS签名 180.188.22.X
    【PostgreSQL内核学习(十一)—— (CreatePortal)】
  • 原文地址:https://blog.csdn.net/cz_chen_zhuo/article/details/127823306