• RabbitMQ系列【11】延迟队列


    有道无术,术尚可求,有术无道,止于术。

    前言

    延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

    需求场景: 用户下单后,30分钟未支付,取消订单,回滚库存。

    本文介绍了三种方式实现,前两种存在一定的局限性

    1. 过期消息实现延迟队列

    发送带有TTL过期属性的消息,到达过期时间后,投递到死信队列,实现延迟队列功能。

    添加一个死信交换机、死信队列:

    @Configuration
    public class RabbitMqDeadQueueConfig {
    
        private static final String DEAD_QUEUE = "deadQueue";
    
        private static final String DEAD_EXCHANGE = "deadExchange";
    
        private static final String DEAD_ROUTE_KEY = "dead.key";
    
        /**
         * 死信队列
         */
        @Bean(DEAD_QUEUE)
        public Queue deadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        /**
         * 死信交换机
         */
        @Bean(DEAD_EXCHANGE)
        public Exchange deadExchange() {
            return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
        }
    
        /**
         * 创建死信队列和死信交换机的绑定关系
         */
        @Bean("deadBinding")
        public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
            return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    添加一个延迟交换机、延迟队列:

    @Configuration
    public class RabbitMqDelayQueueConfig {
    
    
        private static final String DEAD_EXCHANGE = "deadExchange";
    
        private static final String DEAD_ROUTE_KEY = "dead.key";
    
        /**
         * 使用 ExchangeBuilder 创建交换机
         */
        @Bean("delayExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.directExchange("delayExchange").durable(true).build();
        }
    
        /**
         * 创建队列:
         * 指定死信交换机、路由KEY
         */
        @Bean("delayQueue")
        public Queue bootQueue001() {
            return QueueBuilder.durable("delayQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
        }
    
        /**
         * 创建绑定关系
         */
        @Bean("delayBinding")
        public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
            return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").and(null);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    创建一个消费者,接收死信消息:

    @Component
    public class RabbitConsumer {
    
        @RabbitListener(queues = {"deadQueue"})
        public void deadQueue(Message message) {
            System.out.println("收到消息" + new String(message.getBody()));
            System.out.println("当前时间:"+ LocalDateTime.now());
            System.out.println("判断订单状态...." + new String(message.getBody()));
            System.out.println("未支付,回滚数据库....");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    发送TTL订单消息:

            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("10000"); // 10秒过期
            System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
            Message message = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
            rabbitTemplate.send("delayExchange", "delay.key", message);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行程序,可以看到,过了10秒,收到了订单信息~
    在这里插入图片描述

    但是该方式存在一个致命缺陷,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。 每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列。RabbitMQ是等消息到达队列顶部即将被消费时,才会判断其是否过期并删除。所以即使消息过期,也不会马上从队列中抹去。

    首先发送一个过期时间为20秒的消息,再发送一个过期时间为10秒的消息:

            // 发送过期时间为20秒的消息,先到达队列
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("20000"); // 20秒过期
            System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
            Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
            rabbitTemplate.send("delayExchange", "delay.key", message001);
            // 发送过期时间为10秒的消息,后达队列
            messageProperties.setExpiration("10000"); // 20秒过期
            System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
            Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
            rabbitTemplate.send("delayExchange", "delay.key", message002);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    测试发现,第二条过期时间为10秒的消息,虽然过期时间更短,但也需要等到第一条过期后,到达消息顶部,才会被扫描是否过期,由此可见, 过期消息实现延迟队列并不可取~~~
    在这里插入图片描述

    2. 过期队列实现延迟队列

    实现思路:

    1. 用户下单后,生成订单,发送订单消息到延迟队列中,并设置过期时间为30分钟,该队列没有消费者
    2. 订单队列消息过期后,发送订单至死信队列
    3. 死信队列消费者接收到消息后,判断订单状态,进行后续操作
      在这里插入图片描述

    给整个队列添加过期时间实现延迟队列。由于过期时间作用于整个队列,所以不是很灵活,比如设置30分钟需要一个队列,设置10分钟时,又需要创建一个队列。

    创建一个过期时间队列:

        @Bean("delayQueue")
        public Queue delayQueue() {
            return QueueBuilder.durable("delayQueue").withArgument("x-message-ttl", 10000).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
        }
    
    • 1
    • 2
    • 3
    • 4

    参照上面的案例发送消息即可。

    3. 插件实现延迟队列

    上面我们讨论了两种实现延迟队列的方式,但是都存在一些问题,官网也提供了基于插件的方式来实现。

    消息到达延迟交换机后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。

    在这里插入图片描述

    3.1 安装插件

    官网中提供了很多插件,以满足更多的功能需求。

    GitJHub中下载延迟插件:

    在这里插入图片描述
    将其放在RabbitMQ程序主目录的plugins下:

    在这里插入图片描述
    切换到sbin目录下,运行安装插件命令:

    rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
    
    • 1

    成功安装提示:

    在这里插入图片描述

    重启RabbitMQ,进入到交换机页面,添加交换机,可以看到一个新的类型为x-delayed-message,说明插件安装成功:
    在这里插入图片描述

    3.2 代码实现

    首先创建x-delayed-message类型的交换机,并绑定队列:

    @Configuration
    public class RabbitMqDelayQueueConfig {
    
        @Bean("delayExchange")
        public CustomExchange delayExchange() {
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
        }
    
        /**
         * 创建队列:
         * 指定死信交换机、路由KEY
         */
        @Bean("delayQueue")
        public Queue delayQueue() {
            return QueueBuilder.durable("delayQueue").build();
        }
    
        /**
         * 创建绑定关系
         */
        @Bean("delayBinding")
        public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
            return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    创建消费者,监听延迟队列:

        @RabbitListener(queues = {"delayQueue"})
        public void deadQueue(Message message) {
            System.out.println("收到消息" + new String(message.getBody()));
            System.out.println("当前时间:"+ LocalDateTime.now());
            System.out.println("判断订单状态...." + new String(message.getBody()));
            System.out.println("未支付,回滚数据库....");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    创建生产者,发送不同延迟时间的消息:

            // 发送延迟时间为20秒的消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setDelay(20000); // 20秒过期
            System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
            Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
            rabbitTemplate.send("delayExchange", "delay.key", message001);
            // 发送延迟时间为10秒的消息
            messageProperties.setDelay(10000); // 10秒过期
            System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
            Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
            rabbitTemplate.send("delayExchange", "delay.key", message002);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.3 测试

    可以看到第一条消息和第二条消息,都是在各自指定的延迟时间后被消费,并没有出现时序问题,而且每个消息都具有不同的延迟时间,灵活性很高。

    所以插件是实际应用中常用的一种方式,要实现延迟队列功能,当前其他MQ,甚至Redis也可以~~~
    在这里插入图片描述

  • 相关阅读:
    Graphviz代码流程图-智慧交通结构图
    SpringCloud - 服务调用
    SpringBean的生命周期
    Codesys数据类型(2.7):扩展数据类型之 别名 详解
    Java中Map架构简介说明
    ARM64-内嵌汇编
    从无栈协程到 C++异步框架
    11.10~11.15置信区间,均值、方差假设检验,正态,t,卡方,F分布,第一第二类错误
    Nginx 解析漏洞
    「地埋式积水监测站」智慧防汛,科技先行
  • 原文地址:https://blog.csdn.net/qq_43437874/article/details/127779664