• 通过rabbitmq生成延时消息,并生成rabbitmq镜像


    整体描述

    1. 使用场景

    在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。

    2. 目前问题

    之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。

    3. 前期准备

    需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…

    具体步骤

    为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。

    1. 拉取镜像

    首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:

    docker pull rabbitmq:3.8.17-management
    
    • 1

    注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。

    2. 运行镜像

    拉取成功之后,使用命令:

    docker images
    
    • 1

    查看镜像是否拉取成功,如下就是成功了:
    rabbitmq镜像
    之前用的3.6.8的,不支持延时消息队列的插件…
    然后运行镜像,创建容器并启动:

    docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management
    
    • 1

    此时用:

    docker ps -a
    
    • 1

    查看容器:
    rabbitmq容器
    容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
    默认账号:guest,密码:guest
    rabbitmq登录页面

    3. 安装插件

    此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
    选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
    上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1

    执行之后看到如下就成功了:
    插件启动成功
    成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
    添加延时交换机
    此时,我们的rabbitmq就配置完成了。

    4. 代码支持

    rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。

    4.1 config文件

    package com.thcb.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * RabbitMQ的配置类
     *
     * @author thcb
     * @date 2023-09-05
     */
    @Configuration
    public class RabbitMqConfig {
    
        // 交换机
        private static final String DELAYED_EXCHANGE = "delayed.exchange";
    
        // 队列
        private static final String DELAYED_QUEUE = "delayed.queue";
    
        // 路由
        private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
    
        /**
         * 队列
         */
        @Bean
        public Queue delayedQueue() {
            return new Queue(DELAYED_QUEUE);
        }
    
        /**
         * 交换机
         */
        @Bean
        public CustomExchange delayedExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);
        }
    
        /**
         * 绑定延迟队列和交换机
         */
        @Bean
        public Binding delayQueueBindingDelayExchange() {
            return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    4.2 消费监听

    package com.thcb.rabbitmq.recevier;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 消费监听
     *
     * @author thcb
     * @date 2023-09-05
     */
    @Slf4j
    @Component
    public class DelayQueueReceiver {
    
        @RabbitListener(queues = "delayed.queue")
        public void receiveDelayedQueue(Message message) {
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    4.2 消息生产

    这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。

    package com.thcb.rabbitmq.controller;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     * 消息生产controller
     *
     * @author thcb
     * @date 2023-09-05
     */
    @RestController
    @RequestMapping("/HelloController")
    public class HelloController {
    
        private static final Logger log = LoggerFactory.getLogger(HelloController.class);
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        @RequestMapping("/sendXDLMessage1")
        @ResponseBody
        public String sendXDLMessage1() {
            int time = 5000;
            String message = "{\"type\":\"sendXDLMessage1\"}";
            log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);
            rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {
                msg.getMessageProperties().setDelay(time);
                return msg;
            });
            return "sendXDLMessage1 success";
        }
    
        @RequestMapping("/sendXDLMessage2")
        @ResponseBody
        public String sendXDLMessage2() {
            int time = 30000;
            String message = "{\"type\":\"sendXDLMessage2\"}";
            log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);
            rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {
                msg.getMessageProperties().setDelay(time);
                return msg;
            });
            return "sendXDLMessage2 success";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    5. 功能测试

    代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
    创建的交换机
    创建的队列
    可以看到交换机的类型是x-delayed-message。
    接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
    运行结果
    结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。

    镜像操作

    使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
    一切都准备就绪,就可以制作镜像了。

    1. 镜像制作

    将镜像打包成tar文件。

    docker commit 【镜像id】 rabbitmq:3.8.17
    docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17
    
    • 1
    • 2

    2. 镜像导入

    制作完镜像进行导入

    docker load <rabbitmq-3.8.17.tar
    docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17
    
    • 1
    • 2

    总结

    以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。

  • 相关阅读:
    ClickHouse的 MaterializeMySQL引擎
    关注云栖大会的感受:从工业大脑到全面AI时代的进化
    优秀的 Verilog/FPGA开源项目介绍(三十二)-RISC-V(新增俩)
    Django版本选择、Python兼容问题及更新时间(长期更新)
    Opencv图像轮廓检测——转换灰度图像、二值图像、绘制图像轮廓、轮廓特征、轮廓外接圆
    思维导图制作工具推荐
    DPDK KNI介绍
    【PHPWord】PHPWord 根据word模板生成的内容动态生成目录以及页码更新(完整示例源码)
    CSP-J初赛复习大题整理笔记
    day08PKI以及综合实验
  • 原文地址:https://blog.csdn.net/nhx900317/article/details/132689905