• RabbitMQ------延迟队列(整合SpringBoot以及使用延迟插件实现真正延时)(七)


    RabbitMQ------延迟队列(七)

    延迟队列

    延迟队列,内部是有序的,特点:延时属性。
    简单讲:延时队列是用来存放需要在指定时间被处理的元素队列。
    是基于死信队列的消息过期场景。

    适用场景

    1.订单在十分钟之内未支付则自动取消。
    2.用户注册后,三天内没有登陆,则短信提醒。
    特点:需要在某个事件发生之后或者之前的特定事件点完成莫一项任务。

    整合SpringBoot

    导入依赖

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    <!--        RabbitMQ依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.34</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-swagger2</artifactId>
                <version>2.6.1</version>
            </dependency>
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-swagger-ui</artifactId>
                <version>2.6.1</version>
            </dependency>
     <!--        RabbitMQ测试依赖-->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <version>2.4.7</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    在配置文件application.properties中写明rabbitmq的IP、端口、用户名以及密码

    spring.rabbitmq.host=192.168.200.139
    spring.rabbitmq.prot=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    
    • 1
    • 2
    • 3
    • 4

    架构图如图所示
    在这里插入图片描述
    队列1设置的过期时间为10s,队列2设置的过期时间为40s。
    代码分三部分:生产者、消费者、以及交换机队列整体作为一部分。
    生产者和消费者都不在进行交换机以及队列的声明。
    交换机以及队列配置类的书写:

    /**
     * TTL队列  配置文件类代码
     */
    @Configuration
    public class TTLQueueConfig {
        //普通交换机
        public static final String X_EXCHANGE = "X";
        //死信交换机
        public static final String Y_EXCHANGE = "Y";
        //普通队列1 过期时间10s
        public static final String QA_QUEUE = "QA";
        //普通队列2 过期时间40s
        public static final String QB_QUEUE = "QB";
        //死信队列
        public static final String QD_QUEUE = "QD";
    
        //声明X交换机
        @Bean("xExchange")
        public DirectExchange xExchange(){
            return new DirectExchange(X_EXCHANGE);
        }
    
        //声明X交换机
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return new DirectExchange(Y_EXCHANGE);
        }
    
        //声明普通队列1
        @Bean("queueA")
        public Queue queueA(){
            Map<String,Object> arguments = new HashMap<>();
            //设置死信队列
            arguments.put("x-dead-letter-exchange",QD_QUEUE);
            //设置routingkey
            arguments.put("x-dead-letter-routing-key","YD");
            //设置ttl
            arguments.put("x-message-ttl",10000);
            return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();
        }
    
        //声明普通队列2
        @Bean("queueB")
        public Queue queueB(){
            Map<String,Object> arguments = new HashMap<>();
            //设置死信队列
            arguments.put("x-dead-letter-exchange",QD_QUEUE);
            //设置routingkey
            arguments.put("x-dead-letter-routing-key","YD");
            //设置ttl
            arguments.put("x-message-ttl",40000);
            return QueueBuilder.durable(QB_QUEUE).withArguments(arguments).build();
        }
    
        //声明死信队列
        @Bean("queueD")
        public Queue queueD(){
            return QueueBuilder.durable(QD_QUEUE).build();
        }
        
        //绑定
        //通过容器名字进行捆绑,绑定普通队列A和交换价X
        @Bean
        public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
        //通过容器名字进行捆绑,绑定普通队列B和交换价X
        @Bean
        public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueB).to(xExchange).with("XA");
        }
        //通过容器名字进行捆绑,绑定死信队列D和交换价Y
        @Bean
        public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                      @Qualifier("yExchange") DirectExchange yExchange){
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    生产者代码示例:

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     * 发送延迟
     * 生产者
     */
    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class SendMsgController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //开始发消息
        @GetMapping("/send/{message}")
        public void sendMsg(@PathVariable String message){
            log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date().toString(),message);
    
            /**
             * 交换机
             * routingkey
             * message
             */
            rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的队列:"+message);
            rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的队列:"+message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    消费者代码示例:

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 队列TTL 消费者
     */
    @Slf4j
    @Component
    public class DeadLetterConsumer {
    
        //接收消息
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception {
            String string = new String(message.getBody(),"UTF-8");
            log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),string);
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    注意导包,不要导错了。

    结果:第一条消息在10s后变成死信消息,然后被消费者消费掉,第二条消息在40s后变成死信消息,然后被消费者消费掉,这样就达成了延迟队列的目的。

    局限性:每增加一个延时需求,都需要新增一个普通队列。这样是不合理的。
    优化:只有一个延时队列,由生产者指定需要延时多久

    延时队列优化,由生产者指定延时时间

    增加一个队列QC,QC不设置过期时间,过期时间由生产者指定。
    配置类代码新增QC,不设置存活时间,由生产者发送

        //设置普通队列
        public static final String QC_QUEUE = "QC";
    
        //设置普通队列
        @Bean("queueC")
        public  Queue queueC(){
            HashMap<String,Object> arguments = new HashMap<>();
            //设置死信队列
            arguments.put("x-dead-letter-exchange",QD_QUEUE);
            //设置routingkey
            arguments.put("x-dead-letter-routing-key","YD");
            return QueueBuilder.durable(QC_QUEUE).withArguments(arguments).build();
        }
        //绑定
        //通过容器名字进行捆绑,绑定普通队列A和交换价X
        @Bean
        public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueC).to(xExchange).with("XC");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    生产者新增代码

    //开始发消息
        @GetMapping("/sendExpirationMessage/{message}/{ttlTime}")
        public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
            log.info("当前时间:{},发送一条消息给QC,ttl队列:{},过期时间为:{}ms",new Date().toString(),message,ttlTime);
    
            /**
             * 交换机
             * routingkey
             * message
             * MessagePostProcessor,可以设置存活时间
             */
            //ttlTime设置置过期时间
            rabbitTemplate.convertAndSend("X","XC","消息来自ttl的队列:"+message,msg->{
                //发送消息时,设置存活时间
                msg.getMessageProperties().setExpiration(ttlTime);
             return msg;   
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    使用这种方式,消息并不会按时死亡。因为RabbitMQ只会检测第一个消息是否过期,如果过期,会被放入死信队列。

    经过测试发现,第一个发送20s过期的消息,第二个发送2s过期的消息,结果依然是20s后,20s消息被消费,之后,2s消息才会被消费。 说明延时队列是按顺序执行。如果第一个消息延时很久,后续消息也会延时,并不会优先执行。

    此现象只能通过,基于插件的RabbitMQ进行弥补,自身无法弥补这个缺陷。

    RabbitMQ插件实现延时队列

    安装插件

    在官网上下载

    https://www.rabbitmq.com/community-plugins.html
    
    • 1

    下载rabbitmq_delayed_message_exchange插件。解压放在RabbitMQ的插件目录。
    进入RabbitMQ的安装目录下的plgins目录,执行以下命令让该插件生效,然后重启RabbitMQ。

    --  3.8.8代表rabbitmq版本
    -- 目录如下
    cd   /usr/lib/rabbitmq/rabbitmq_server_3.8.8/plugins
    -- 安装命令,不用写插件版本号
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    -- 重启rabbitmq
    systemctl restart  rabbitmq-server(安装时的服务名)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    重启好后打开rabbitmq的管理端页面,可以在Exchanges目录下,Add a new exchange,Type 中,会增加一个x-delayed-message的选项。
    使用插件,结构更加简单
    代表由交换机进行延迟,而不是队列了。
    在这里插入图片描述
    配置类书写
    当Bean中不指定名称时,名称默认方法名
    自定义交换机时,需要指定交换机类型,而之前未自定交换机,直接创建的DirectExchange交换机

    /**
     * 延迟交换机
     */
    @Configuration
    public class DelayedQueueConfig {
    
        //延迟
        public static final String DELAYED_EXCHANGE = "delayed.exchange";
    
        //延迟队列
        public static final String DELAYED_QUEUE = "delayed.queue";
    
        //routingkey
        public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    
    
        //声明  自定义交换机,基于插件
        @Bean
        public CustomExchange delayedExchange(){
            //String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
            /**
             * 交换机名字
             * 类型
             * 是否持久化
             * 是否自动删除
             * 自定义参数
             */
            HashMap<String,Object> arguments = new HashMap();
            arguments.put("x-delayed-type","direct");
            return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",
                    false,false,arguments);
        }
    
        //声明延迟队列
        @Bean
        public Queue queueDe(){
            return new Queue(DELAYED_QUEUE);
        }
    
        //绑定  当Bean中不指定名称时,名称默认方法名
        @Bean
        public Binding queueBindingExchange(
                @Qualifier("queueDe") Queue queue,
                @Qualifier("delayedExchange") CustomExchange customExchange){
            return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    生产者以及消费者代码与之前相同。
    结论:可以实现根据过期时间,消费消息。

    延时队列也有很多其他的选择,比如Java的DelayQueue,利用Redis的Zset,利用Quartz或者利用kafka的时间轮,各有特点,需要合适的场景。

  • 相关阅读:
    华为奔赴“空间智能”,全屋智能的逻辑变了吗?
    Vue中的计算属性、方法和侦听属性的区别入门版
    【开题报告】疫苗在线预约小程序的设计与实现
    Learn Prompt- Midjourney案例:Logo设计
    【Python】Python语法基础——条件语句与循环
    国际人工智能泰斗—迈克尔·乔丹
    基于LTE的车联网无线通信技术 直连通信系统路侧单元技术要求
    Hadoop超详细入门(一)介绍及安装
    setoolkit启动报错的问题
    零基础Linux_26(多线程)线程池代码+单例模式+线程安全
  • 原文地址:https://blog.csdn.net/cz_chen_zhuo/article/details/127824693