• RabbitMq死信队列


    RabbitMq死信队列

    代码中获取信道可参考 https://www.cnblogs.com/zjh0420/p/16891557.html

    死信的概念

    死信:顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

    应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
    消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时
    间未支付时自动失效。

     

    死信的来源

    • 消息 TTL 过期

    • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)

    • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

     

    死信实战

     

    消息TTL过期 TTl:time to live

    生产者代码

    highlighter- Go
    /**
     * 生产者
     */
    public static void setMessageOverdueProducer() throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            // 死信消息 设置TTL时间  TTl:time to live
            AMQP.BasicProperties properties = new AMQP.BasicProperties()
                    .builder().expiration("10000").build();
    
            // 循环次数
            long length = 11L;
    
            for (int i = 1; i < length; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes(StandardCharsets.UTF_8));
            }
        }

    普通消费者代码

    highlighter- Go
    /**
     * @author zjh
     *
     * 普通消费者
     */
    public class ConsumerOne {
    
        /**
         * 普通交换机
         */
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        /**
         * 死信交换机
         */
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        /**
         * 普通队列名称
         */
        private static final String NORMAL_QUEUE = "normal_queue";
    
        /**
         * 死信队列名称
         */
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            // 声明死信和普通交换机 类型为DIRECT
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 声明普通队列 类型为DIRECT
            Map arguments = new HashMap<>(8);
            // 正常队列设置死信交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 设置死信routingKey
            arguments.put("x-dead-letter-routing-key", "dead");
    
            channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
    
            // 声明死信 类型为DIRECT
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    
            // 绑定普通交换机与普通队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
            // 绑定死信交换机与死信队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
            System.out.println("等待接收消息...");
    
            // 接收消息回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("ConsumerOne接收消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
            };
    
            channel.basicConsume(NORMAL_QUEUE, false, deliverCallback,
                    consumerTag -> {});
        }
    }

    死信消费者代码

    highlighter- Go
    /**
     * @author zjh
     *
     * 死信队列 消费者
     */
    public class ConsumerTwo {
    
        /**
         * 死信队列名称
         */
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            System.out.println("等待接收消息...");
    
    
            channel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> System.out.println("ConsumerTwo接收消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)),
                    consumerTag -> {});
        }
    }

    测试方法步骤

    1. 首先先启动 ConsumerOne 进行声明交换机和队列

    2. 声明好后将 ConsumerOne 停止,因为测试消费TTL过期设置了10s,所以将消费者停止 消息无人消费就会进入死信队列

    3. 启动 Producer 发送消息,可以看到正常队列有十条消息等待消费

    4. 等待十秒钟可以看到十条消息都进入死信队列了

    5. 在启动 ConsumerTwo 消费死信队列

     
     

    队列达到最大长度

    生产者代码

    跟上放 消息TTL过期 生产者差不多,去掉了过期时间

    highlighter- Go
    /**
     * 生产者
     */
    public static void normalProducer() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
    
        // 循环次数
        long length = 11L;
    
        for (int i = 1; i < length; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "normal", null, message.getBytes(StandardCharsets.UTF_8));
        }
    }

    普通消费者代码

    跟上放 消息TTL过期 的消费者代码其实一样只不过多了 arguments.put("x-max-length", 6); 参数设置队列长度

    highlighter- Go
    /**
     * @author zjh
     *
     * 普通消费者
     */
    public class ConsumerOne {
    
        /**
         * 普通交换机
         */
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        /**
         * 死信交换机
         */
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        /**
         * 普通队列名称
         */
        private static final String NORMAL_QUEUE = "normal_queue";
    
        /**
         * 死信队列名称
         */
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            // 声明死信和普通交换机 类型为DIRECT
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 声明普通队列 类型为DIRECT
            Map arguments = new HashMap<>(8);
            // 正常队列设置死信交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 设置死信routingKey
            arguments.put("x-dead-letter-routing-key", "dead");
            // 设置正常队列的长度限制
            arguments.put("x-max-length", 6);
            channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
    
            // 声明死信 类型为DIRECT
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    
            // 绑定普通交换机与普通队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
            // 绑定死信交换机与死信队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
            System.out.println("等待接收消息...");
    
    
            channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> System.out.println("ConsumerOne接收消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)),
                    consumerTag -> {});
        }
    }

    死信消费者代码

    跟上方 消息TTL过期 的死信消费者代码一致

    测试方法步骤

    1. 先把原来声明的队列删除,在启动 ConsumerOne 进行声明交换机和队列。
    2. 再把 ConsumerOne 给停止,防止普通队列被消费了。
    3. 启动 Producer 进行发送消息, 因为普通消费者和死信消费者都没启动所以可以看到队列中的数据条数。
    4. 最后启动 ConsumerOne、ConsumerTwo 把队列都消费掉。

     
     

    消息被拒

    生产者代码

    跟上方 队列达到最大长度 的生产者代码一致

    普通消费者代码

    模拟拒绝接收指定消息:在下方接收消息回调中加了一个判断, 等于info5的消息拒收,并且不放回队列重新消费,让其变为死信。

    highlighter- Go
    /**
     * @author zjh
     *
     * 普通消费者
     */
    public class ConsumerOne {
    
        /**
         * 普通交换机
         */
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        /**
         * 死信交换机
         */
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        /**
         * 普通队列名称
         */
        private static final String NORMAL_QUEUE = "normal_queue";
    
        /**
         * 死信队列名称
         */
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            // 声明死信和普通交换机 类型为DIRECT
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 声明普通队列 类型为DIRECT
            Map arguments = new HashMap<>(8);
            // 正常队列设置死信交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 设置死信routingKey
            arguments.put("x-dead-letter-routing-key", "dead");
    
            channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
    
            // 声明死信 类型为DIRECT
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    
            // 绑定普通交换机与普通队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
            // 绑定死信交换机与死信队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
            System.out.println("等待接收消息...");
    
            // 接收消息回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String msg = new String(message.getBody(), StandardCharsets.UTF_8);
                // 指定 info5 消息拒收
                if ("info5".equals(msg)) {
                    System.out.println("ConsumerOne接收消息是:" + msg + ", 此消息是被拒收的!");
                    // 不放回队列 成为死信
                    channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
                } else {
                    System.out.println("ConsumerOne接收消息是:" + msg);
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            };
    
            channel.basicConsume(NORMAL_QUEUE, false, deliverCallback,
                    consumerTag -> {});
        }
    }

    死信消费者代码

    跟上方 消息TTL过期 的死信消费者代码一致

    测试方法步骤

    1. 先把原来声明的队列删除,在启动 ConsumerOne 进行声明交换机和队列
    2. 启动 Producer 进行发送消息, 可以在 ConsumerOne 控制台中看到打印的信息。
    3. 查看 RabbitMq Management 中死信队列是否存在一条消息,可以看到已经存在了。
    4. 可以进入队列中查看消息具体样子,确认是否是我们所拒收的 进入队列内部 点击Get-Messages -> GetMessage(s)。
    5. 最后启动死信消费者,将死信队列中的数据消费掉

    __EOF__

  • 本文作者: 小米粥
  • 本文链接: https://www.cnblogs.com/zjh0420/p/16893337.html
  • 关于博主: 一个02年的程序猿...
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    Spark集群配置Hive
    无代码平台助力中山医院搭建“智慧化管理体系”,实现智慧医疗
    Java电子招投标采购系统源码-适合于招标代理、政府采购、企业采购、等业务的企业
    [Database] MySQL 8.x Window / Partition Function (窗口/分区函数)
    2022微服务面试题 最新50道题(含答案解析)
    jeecg中j-vxe-table和j-popup组件的修改使用
    CSS 高阶小技巧 - 角向渐变的妙用!
    “探寻服务器的无限潜能:从创意项目到在线社区,你会做什么?”
    vlog常用参数解析
    spring boot 整合 shiro 框架
  • 原文地址:https://www.cnblogs.com/zjh0420/p/16893337.html