• 4. 死信队列


    二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

    死信

    概念

    先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
    解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列

    应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ的死信队列机制,当消息
    消费发生异常时,将消息投入死信队列中.

    还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

    死信来源

    1. 消息 TTL 过期

    2. 队列达到最大长度(队列满了,无法再添加数据到mq中)

    3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

    image.png

    消息 TTL 过期

    生产者 设置 ttl

    public class DeadMessageProducer1 {
        private static String EXCHANGE_NAME = "normal_exchange";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            Channel channel = connection.createChannel();
            // 创建交换机, 交换机 名称是  logs
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
                    true, false, null);
    
            // 设置消息的 TTL 过期时间  10s
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                // routingKey 为 空
                String inputMessage = scanner.nextLine();
    
                // 设置过期时间
                channel.basicPublish(EXCHANGE_NAME, "zhangsan",
                        properties, inputMessage.getBytes("UTF-8"));
            }
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    正常交换机 绑定到死信交换机

    public class DeadMessageConsumer1 {
        private static String EXCHANGE_NAME = "normal_exchange";
        private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    
    
        private static String QUEUE_NAME = "normal_queue";
        private static String DEAD_QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
    
            //1. 创建死亡队列和绑定关系
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");
    
    
            //2. 创建活着的队列和绑定
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
    
            Map<String, Object> params = new HashMap();
    
            // 死信交换机 exchange
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
            // 死信交换机的 routing-key
            params.put("x-dead-letter-routing-key", "lisi");
    
            // 创建队列时, 绑定 死信队列的信息。
            channel.queueDeclare(QUEUE_NAME, true, false, false, params);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
    
            TimeUnit.SECONDS.sleep(8);
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    死信队列获取消息

    public class DeadMessageConsumer1_1 {
        private static String EXCHANGE_NAME = "dead_exchange";
        private static String QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lisi");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    验证

    先不启动 死信队列的消费者。

    先启动生产者,DeadMessageProducer1 再启动 绑定的死信交换机, DeadMessageConsumer1 启动好之后, 立即关闭 (创建绑定)。

    未发送消息之前:

    image.png

    再从生产者上发送3条消息 (此时 DeadMesageConsumer1 已经关闭)

    image.png

    等 10s 之后

    image.png

    启动死信队列消息

    image.png

    消息被消费

    image.png

    队列达到最大长度

    此时需要把原先队列删除 因为参数改变了

    生产者

    生产者代码去掉 TTL 属性,

    发送 10 条消息

    public class DeadMessageProducer2 {
        private static String EXCHANGE_NAME = "normal_exchange";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            Channel channel = connection.createChannel();
            // 创建交换机, 交换机 名称是  logs
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
                    true, false, null);
    
            for (int i = 0; i < 10; i++) {
                // 设置过期时间
                String message = "发送消息" + i;
                channel.basicPublish(EXCHANGE_NAME, "zhangsan",
                        null, message.getBytes("UTF-8"));
            }
            System.in.read();
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    交换机绑定到死信交换机, 设置长度

    只能获取6个

    public class DeadMessageConsumer2 {
        private static String EXCHANGE_NAME = "normal_exchange";
        private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    
    
        private static String QUEUE_NAME = "normal_queue";
        private static String DEAD_QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
    
            //1. 创建死亡队列和绑定关系
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");
    
    
            //2. 创建活着的队列和绑定
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
    
            Map<String, Object> params = new HashMap();
    
            // 死信交换机 exchange
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
            // 死信交换机的 routing-key
            params.put("x-dead-letter-routing-key", "lisi");
    
            // 设置正常队列的长度 是 添加的方法。
            // 需要先将队列删除,再重新构建。
            params.put("x-max-length", 6);
    
            // 创建队列时, 绑定 死信队列的信息。
            channel.queueDeclare(QUEUE_NAME, true, false, false, params);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
    
            TimeUnit.SECONDS.sleep(8);
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    死信队列获取消息 保持不变

    public class DeadMessageConsumer2_1 {
        private static String EXCHANGE_NAME = "dead_exchange";
        private static String QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lisi");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    验证

    操作流程与之前是一样的

    image.png

    消息被拒

    生产者 保持不变

    public class DeadMessageProducer3 {
        private static String EXCHANGE_NAME = "normal_exchange";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            Channel channel = connection.createChannel();
            // 创建交换机, 交换机 名称是  logs
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
                    true, false, null);
    
            for (int i = 0; i < 10; i++) {
                // 设置过期时间
                String message = "发送消息" + i;
                channel.basicPublish(EXCHANGE_NAME, "zhangsan",
                        null, message.getBytes("UTF-8"));
            }
            System.in.read();
            channel.close();
            connection.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    交换机绑定到死信交换机, 去掉长度,设置 手动提交

    public class DeadMessageConsumer3 {
        private static String EXCHANGE_NAME = "normal_exchange";
        private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    
    
        private static String QUEUE_NAME = "normal_queue";
        private static String DEAD_QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
    
            //1. 创建死亡队列和绑定关系
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");
    
    
            //2. 创建活着的队列和绑定
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
    
            Map<String, Object> params = new HashMap();
    
            // 死信交换机 exchange
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
            // 死信交换机的 routing-key
            params.put("x-dead-letter-routing-key", "lisi");
    
    
            // 创建队列时, 绑定 死信队列的信息。
            channel.queueDeclare(QUEUE_NAME, true, false, false, params);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
    
    
                String textMessage = new String(message.getBody());
    
                if ("发送消息5".equals(textMessage)) {
                    System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
                    // 确认这一个.
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                } else {
                    // 进行拒绝
                    // 拒绝重新入队, 设置了死信的话,就发送到死信队列里面。
                    channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
                }
    
    
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
    
            TimeUnit.SECONDS.sleep(8);
            // 设置不自动提交。
            channel.basicConsume(
                    QUEUE_NAME, false, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    死信队列获取消息 保持不变

    public class DeadMessageConsumer3_1 {
        private static String EXCHANGE_NAME = "dead_exchange";
        private static String QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lisi");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    验证

    image.png

    启动1,再启动2

    image.png

    只将5 拒收 , 死信队列 只获取到消息 5

    image.png

    image.png

    延时队列

    概念

    延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,
    延时队列中的元素是希望在指定时间到了以后或之前取出和处理,
    简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。

    延时队列使用场景

    1.订单在十分钟之内未支付则自动取消
    2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
    3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
    4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

    这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,
    如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;
    看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
    如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,
    如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,
    确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,
    短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,
    对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,
    同时会给数据库带来很大压力,无法满足业务要求而且性能低下

    image.png

    TTL

    TTL 是什么呢?TTL 是 RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有
    消息的最大存活时间, 单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL

    队列设置 TTL

    在创建队列的时候设置队列的“x-message-ttl”属性

    image.png

    消息设置 TTL

    另一种方式便是针对每条消息设置 TTL

    image.png

    区别

    如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队
    列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者 之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃。

    前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ实现延时队列的两大要素已
    经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延 时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面, 成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为 里面的消息都是希望被立即处理的消息。

  • 相关阅读:
    keycloak~对框架中提供的Provider总结
    全解MySQL之各方位事无巨细的剖析存储过程与触发器!
    Linux环境变量
    Metabase学习教程:视图-5
    led护眼灯真的能护眼吗?Led护眼灯的好处
    阿斯达年代记三强争霸服务器没反应 安装中发生错误的解决方法
    【无标题】
    量化交易:开发传统趋势策略之---双均线策略
    阿里云武林头条活动分享
    雷电模拟器上使用第一个frida(一)之安装
  • 原文地址:https://blog.csdn.net/yjltx1234csdn/article/details/128170834