• RabbitMQ(六) 延迟消息队列


    一. 什么时候需要延迟队列

    例子:

    1.电商项目中,当我们下单后,一般需要二十分之内或者三十分钟之内付款,否则订单就会进入异常逻辑中被取消,那么进入异常处理逻辑中,就可以当成是一个延迟队列。
    2.公司的会议预定系统,在会议预定成功后,会在会议开始前半个小时通知所有预定该会议的用户。
    3.安全工单超过24h未处理,则自动拉企业微信群提醒相关责任人。
    4.用户下单外卖后,距离超时时间还有10min提醒外卖小哥即将超时。
    5.我买了一个智能砂锅,可以预定时间煮饭,这也是一个延迟任务,放到一个延迟队列中,时间到了再执行。

    注意::搞一个定时任务也可以解决以上场景。但是,如果项目中有很多这样的场景,那么定时任务很明显不是最佳方案,我们可以通过延迟队列来实现一个通用的解决方案

    二.延迟队列实现思路:

    DLX(死信交换机)+TTL(消息超时时间)
    即我们可死信队列当成延迟队列。

    具体操作流程:

    假如一条消息需要延迟30min执行,我们就设置这条消息的有效期为30min,同时为这条消息配置死信交换机和死信,并且不为这个消息队列设置消费者,那么30min后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在蹲点这个死信队列,消息一进入死信队列,就立马被消费。

    三.小例子

    3.1 依赖

    在这里插入图片描述

    3.2 application

    spring.rabbitmq.host=localhost
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.port=5672
    
    • 1
    • 2
    • 3
    • 4

    3.3Config配置类

    配置原理:

    1.配置分为两组,第一组配置死信队列 第二组配置普通队列。每一组都由消息队列,消息交换机以及Binging组成
    2.配置消息队列时,为消息队列指定死信队列,
    3.配置队列中的消息过期时间时,默认的时间单位时毫秒

    
    package yanchi.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class QueueConfig {
        public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
        public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
        public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key";
        //死信队列
        public static final String DLX_QUEUE_NAME = "dlx_queue_name";
        //死信交换机
        public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
        //routingkey
        public static final String DLX_ROUTING_KEY = "dlx_routing_key";
    
        /*** 死信队列 * @return */
        @Bean
        Queue dlxQueue() {
            return new Queue(DLX_QUEUE_NAME, true, false, false);
        }
    
    
        /*** 死信交换机 * @return */
        @Bean
        DirectExchange dlxExchange() {
            return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
        }
    
    
        /*** 绑定死信队列和死信交换机 * @return */
        @Bean
        Binding dlxBinding() {
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
        }
    
        /*** 普通消息队列 * @return */
        @Bean
        Queue javaboyQueue() {
            Map<String, Object> args = new HashMap<>();
            //设置消息过期时间
            args.put("x-message-ttl", 1000 * 10);
            // 设置死信交换机
            args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
            //设置死信routing_key
            args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
            return new Queue(JAVABOY_QUEUE_NAME, true, false, false, args);
        }
    
    
        /*** 普通交换机 * @return */
        @Bean
        DirectExchange javaboyExchange() {
            return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
        }
    
    
        /*** 绑定普通队列和与之对应的交换机 * @return */
        @Bean
        Binding javaboyBinding() {
            return BindingBuilder.bind(javaboyQueue()).to(javaboyExchange()).with(JAVABOY_ROUTING_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    3.4在消费者类中为死信队列配置一个消费者:

    @Component
    public class DlxConsumer {
        private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);
    
        @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
        public void handle(String msg) {
            logger.info(msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.5测试类

     @Test
        void contextLoads1() {
            System.out.println(new Date());
            rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    四. 插件实现延迟队列

    4.1 安装插件

    插件地址-github开源

    先看好自己的版本,我这里是3.10所所以我下3.10的版本
    注意:版本非常重要,不对的版本你没办法运行

    在这里插入图片描述

    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez

    在这里插入图片描述

    这样我们就安装好了这个插件。

    接下来我们就进入RabbitMQ容器里面去安装这个插件:

    1.先看一下你的RabbitMQ容器是叫什么
    docker -pa -a 查看你容器的名字在这里插入图片描述
    2.
    我要用名为rabbitMQ的这个RabbitMQ容器,所有我们进入他
    docker exec -it rabbitMQ /bin/bash
    在这里插入图片描述
    3.用rabbitmq-plugins list查看我们RabbitMQ中已经安装的插件
    在这里插入图片描述
    4.exit退出容器

    5.把我们刚才下载的插件拷贝到容器里面去
    容器中有一个plugins文件夹专门用来存放拷贝的插件的
    在这里插入图片描述
    6. 执行 docker cp ./rabbitmq_delayed_message_exchange-3.10.2.ez rabbitMQ:/plugins把刚才下载的插件拷贝到我们的名为rabbitMQ的文件中。在这里插入图片描述
    7.拷贝进来后我们再次进入容器,并且再次查看插件
    但是发现这个延迟交换机并没有启用,所以下面我们去启用这个交换机,要先退出。
    8.我们去启用这个交换机
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    enable 后面的是这个交换机的名字
    在这里插入图片描述
    9.发现交换机启用了
    在这里插入图片描述

    4.2 搭建springBoot项目

    4.2.1 依赖

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4.2.2 application

    spring.rabbitmq.host=1.12.235.192
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    server.port=8089
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4.2.3 配置类DelayedMsgConfig

     * 这里主要是交换机的定义有所不同,小伙伴们需要注意。
     * ``这里我们使用的交换机是 CustomExchange``,这是一个 Spring 中提供的交换机,创建
     * CustomExchange 时有五个参数,含义分别如下:
     * 交换机名称。
     * 交换机类型,这个地方是固定的。
     * 交换机是否持久化。
     * 如果没有队列绑定到交换机,交换机是否删除。
     * 其他参数
     * 最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、
     * topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    @Configuration
    public class DelayedMsgConfig {
        public static final String DELAYED_MSG_QUEUE_NAME = "delayed_msg_queue_name";
        public static final String DELAYED_MSG_EXCHANGE_NAME = "delayed_msg_exchange_name";
        public static final String DELAYED_MSG_EXCHANGE_TYPE = "x-delayed-message";
    
        @Bean
        Queue delayedQueue() {
            return new Queue(DELAYED_MSG_QUEUE_NAME, true, false, false);
        }
    
        /**
         * 这里主要是交换机的定义有所不同,小伙伴们需要注意。
         * 这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建
         * CustomExchange 时有五个参数,含义分别如下:
         * 交换机名称。
         * 交换机类型,这个地方是固定的。
         * 交换机是否持久化。
         * 如果没有队列绑定到交换机,交换机是否删除。
         * 其他参数
         * 最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、
         * topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
         * @return
         */
        @Bean
        CustomExchange delayedCustomExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_MSG_EXCHANGE_NAME, DELAYED_MSG_EXCHANGE_TYPE, true, false, args);
        }
    
        @Bean
        Binding delayedBinding() {
            return BindingBuilder.bind(delayedQueue())
                    .to(delayedCustomExchange())
                    .with(DELAYED_MSG_QUEUE_NAME).noargs();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    4.2.4 创建消费者DelayedMsgConsumer

    @Component
    public class DelayedMsgConsumer {
        private static final Logger logger = LoggerFactory.getLogger(DelayedMsgConsumer.class);
    
        @RabbitListener(queues = DelayedMsgConfig.DELAYED_MSG_QUEUE_NAME)
        public void handleMsg(byte[] msg) {
            logger.info("{}", new String(msg, 0, msg.length));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    4.2.5 测试类

      @Test
        void test07() {
            Message msg = MessageBuilder.withBody(("hello:" + new Date()).getBytes()).setHeader("x-delay", 3000).build();
            rabbitTemplate.send(DelayedMsgConfig.DELAYED_MSG_EXCHANGE_NAME, DelayedMsgConfig.DELAYED_MSG_QUEUE_NAME, msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.2.6 测试结果

    结果都是延迟三秒,消息延迟实现了
    在这里插入图片描述

  • 相关阅读:
    Mybatis—SqlSource与BoundSql
    最专业 大话C#之WPF业务场景入门和进阶,深入浅出解析章节教程 17 UI层树形结构布局入门
    Win11更改系统文件夹的默认保存位置方法分享
    【HTML】HTML网页设计--智能养老系统前端
    数仓4.0(一)
    C++ 炼气期之算术运算符
    牛视系统源码定制开发,抖音矩阵系统定制开发。
    android EventBus
    MyBatis注解开发
    结构化机器学习策略
  • 原文地址:https://blog.csdn.net/weixin_43189971/article/details/126279443