• 【RabbitMQ】——死信


    一、概念

      先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到 broker或者直接到queue里了,consumer 从 queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
      应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未古什时白动失效。

    二、来源

    1. 消息TTL过期(TTL 存活时长)
    2. 队列达到最大长度(队列满了,无法再添加数据到mq中)
    3. 消息被拒绝(basic.reject或 basic.nack)并且requeue=false。

    三、实现

    1. 设置一个普通交换机(normal_exchange)和一个普通个队列(normal_queue)。
    2. 设置一个死信交换机(dead_exchange)和一个死信个队列(dead_queue)。
    3. 当普通队列的消息成为死信消息时候,该消息就会转发到死信队列,进行消费。
    4. 核心实现在消费者01中,消费者01 创建了普通交换机和普通队列并进行绑定,创建死信交换机和死信队列并进行绑定,在普通队列中绑定死信交换机和死信队列。
    Map<String, Object> arguments = new HashMap<>();
    //设置 死信交换机
    arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
    //设置 死信routingKey
    arguments.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD);
    /**
     * 声明普通队列
     */
    channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    1. TTL过期

    过期时间单位ms ,消息过期时间有两种方式,一种是通过 生产者,另一种是通过消费者。
    通过生产者方式:

      AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); 
       channel.basicPublish(EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, properties, message.getBytes("UTF-8"));
    
    • 1
    • 2

    通过消费者方式:

      Map<String, Object> arguments = new HashMap<>();
     //第一种,消息过期类型 过期时间,单位毫秒 (建议从生产者端 设置过期时间,比较灵活)
     arguments.put("x-message-ttl", 10 * 1000);
     channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
    
    • 1
    • 2
    • 3
    • 4
    <1> 生产者01
    package com.rabbitmqDemo.rabbitmq.eight;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    public class Producer01 {
        private static final String EXCHANGE_NORMAL = "normal_exchange";
        private static final String ROUTING_KEY_NORMAL = "normal";
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
            /**
             * 死信消息 设置TTL时间
             * 单位毫秒
             */
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
    
                /**
                 * 发送消息
                 * param1 发送到哪个交换机
                 * param2 routingKey
                 * param3 其他参数信息
                 * param4 发送的消息体
                 */
                channel.basicPublish(EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, properties, message.getBytes("UTF-8"));
                System.out.println("message send end : " + message);
    
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    <2> 消费者01
    package com.rabbitmqDemo.rabbitmq.eight;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 死信队列实战
     */
    public class Consumer01 {
    
        private static final String EXCHANGE_NORMAL = "normal_exchange";
        private static final String EXCHANGE_DEAD = "dead_exchange";
    
        private static final String QUEUE_NORMAL = "normal_queue";
        private static final String QUEUE_DEAD = "dead_queue";
    
        private static final String ROUTING_KEY_NORMAL = "normal";
        private static final String ROUTING_KEY_DEAD = "dead";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
            /**
             * 声明正常交换机和死信交换机
             */
            channel.exchangeDeclare(EXCHANGE_NORMAL, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);
    
            /**
             * 普通队列参数
             */
            Map<String, Object> arguments = new HashMap<>();
            //第一种,消息过期类型 过期时间,单位毫秒 (建议从生产者端 设置过期时间,比较灵活)
            arguments.put("x-message-ttl", 10 * 1000);
            //设置 死信交换机
            arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
            //设置 死信routingKey
            arguments.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD);
            /**
             * 声明普通队列
             */
            channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
            /**
             * 声明死信队列
             */
            channel.queueDeclare(QUEUE_DEAD, false, false, false, null);
    
            /**
             * 绑定普通交换机队列 和 死信交换机队列
             * param1 队列名称
             * param2 交换机名称
             * param3 routingkey
             */
            channel.queueBind(QUEUE_NORMAL, EXCHANGE_NORMAL, ROUTING_KEY_NORMAL);
            channel.queueBind(QUEUE_DEAD, EXCHANGE_DEAD, ROUTING_KEY_DEAD);
    
            //声明 普通队列 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Consumer01-message:" + msg);
            };
            //声明 普通队列 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Consumer01-消息消费被中断-" + consumerTag);
            };
    
            /**
             * 普通 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("Consumer01等待接收消息......");
            channel.basicConsume(QUEUE_NORMAL, false, deliverCallback, cancelCallback);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    <3> 消费者02
    package com.rabbitmqDemo.rabbitmq.eight;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer02 {
       
        private static final String EXCHANGE_DEAD = "dead_exchange";
        private static final String QUEUE_DEAD = "dead_queue";
        private static final String ROUTING_KEY_DEAD = "dead";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
          
            channel.queueBind(QUEUE_DEAD, EXCHANGE_DEAD, ROUTING_KEY_DEAD);
    
            //声明 死信队列 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Consumer02-message:" + new String(message.getBody(), "UTF-8"));
            };
            //声明 死信队列 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Consumer02-消息消费被中断-" + consumerTag);
            };
    
            /**
             * 死信 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("Consumer02等待接收消息......");
            channel.basicConsume(QUEUE_DEAD, true, deliverCallback, cancelCallback);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    <4> 运行步骤

    第一步:先运行消费者01 创建普通交换机和死信交换机,然后结束消费者01进程,创造无法消费的条件。
    第二步:运行生产者01,此时消息会通过普通交换机发送到普通队列,TTL过期之后会通过死信交换机转移到死信队列。
    第三步:运行消费者02,会消费死信队列里的消息

    <5> 运行结果

    在这里插入图片描述

    2. 队列达到最大长度

    在上一个案例的基础上进行修改,将设置TTL的代码改为 设置队列最大长度即可

    /**
     * 普通队列参数
     */
    Map<String, Object> arguments = new HashMap<>();
    //第一种,消息过期类型 过期时间,单位毫秒 (建议从生产者端 设置过期时间,比较灵活)
    //arguments.put("x-message-ttl", 10 * 1000);
    //第二种,队列长度类型 设置正常队列长度限制
    arguments.put("x-max-length",6);
    //设置 死信交换机
    arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
    //设置 死信routingKey
    arguments.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD);
    /**
     * 声明普通队列
     */
    channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3. 消息被拒绝

    在上一个案例的基础上进行修改,将设置TTL和设置队列最大长度的代码隐藏,然后在普通队列 消费者成功消费的回调的方法中添加消息拒绝的代码。

    //声明 普通队列 消费者成功消费的回调
    DeliverCallback deliverCallback = (consumerTag, message) -> {
        //第三种,消息被拒绝类型
        String msg = new String(message.getBody(), "UTF-8");
        if ("rejectmessage".equals(msg)) {
            System.out.println("Consumer01-message-reject:" + msg);
            //拒接消息
            channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
        } else {
            System.out.println("Consumer01-message:" + msg);
    
        }
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    nginx反向代理location和proxy_pass的映射关系
    Docker数据卷&&自定义Docker镜像
    字节面试官:“这92道 Spring Boot 面试题都答不上来?”
    #21天学习挑战赛—深度学习实战100例#——动物识别
    qt开发-09_分裂器
    java计算机毕业设计高校选课系统源码+mysql数据库+系统+lw文档+部署
    【CSS基础】基础选择器+字体文本相关样式
    PHP将相对路径URL转换为绝对路径URL
    [数据集][目标检测]井盖未盖好检测数据集VOC+YOLO格式20123张2类别
    设计模式原则——接口隔离原则
  • 原文地址:https://blog.csdn.net/qq_42000631/article/details/126378844