• 基于插件实现RabbitMQ“延时队列“


    1.官网下载

    添加链接描述下载rabbitmq_delayed_message_exchange 插件,本文以v3.10.0为例
    在这里插入图片描述

    1.1.上传安装包

    scp /Users/hong/资料/rabbitmq_delayed_message_exchange-3.10.0.ez  root@10.211.55.4:/usr/local/software
    
    • 1

    在这里插入图片描述
    在这里插入图片描述

    1.2.将文件移入RabbitMQ的安装目录下的plugins目录

    mv rabbitmq_delayed_message_exchange-3.10.0.ez /usr/local/software/rabbitmq_server-3.10.0/plugins
    
    • 1

    在这里插入图片描述
    在这里插入图片描述

    1.3.安装插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1

    在这里插入图片描述

    1.4 重启后验证

    rabbitmq-server start
    
    • 1

    在这里插入图片描述

    在这里插入图片描述

    2.两种实现方式图解

    在这里插入图片描述
    在这里插入图片描述

    3.基于插件的延迟队列配置类

    在这里插入图片描述

    package com.hong.springboot.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    
    import java.util.HashMap;
    import java.util.Map;
    
    
    /**
     * @Description: 延迟队列配置类
     * @Author: hong
     * @Date: 2024-02-25 20:19
     * @Version: 1.0
     **/
    @Configuration
    public class DelayedQueueConfig {
        public static final String DELAYED_QUEUE_NAME = "delayed.queue";
        public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
        public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
    
        /**
         * 基于延迟插件声明自定义交换机
         * 在自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制,消息
         * 传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,
         * 当达到投递时间才会投递到目标队列中
         * @return
         */
        @Bean
        public CustomExchange delayedExchange(){
            Map<String,Object> map = new HashMap<>();
            map.put("x-delayed-type","direct");
            /**
             * 声明自定义交换机
             * 第1个参数:交换机名称
             * 第2个参数:交换机类型
             * 第3个参数:是否需要持久化
             * 第4个参数:是否需要自动删除
             * 第5个参数:其他参数
             */
            return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,map);
        }
    
        @Bean
        public Queue delayedQueue(){
            return  new Queue(DELAYED_QUEUE_NAME);
        }
    
        @Bean
        public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                      @Qualifier("delayedExchange") CustomExchange delayedExchange) {
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    4.生产者发送消息

        /**
         * 基于延迟插件的发送消息
         * @param message
         * @param delayTime 延迟时间
         */
        @GetMapping("sendDelayMsg/{message}/{delayTime}")
        public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
            rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, correlationData -> {
                correlationData.getMessageProperties().setDelay(delayTime);
                return correlationData;
            });
            log.info("当前时间:{},发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , delayTime, message);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    5.消费者端代码

    package com.hong.springboot.rabbitmq.consumer;
    
    import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @Description: 基于延迟插件的延迟消费者
     * @Author: hong
     * @Date: 2024-02-25 21:27
     * @Version: 1.0
     **/
    @Slf4j
    @Component
    public class DelayQueueConsumer {
        @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
        public void receiveDelayMessage(Message message){
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到延迟队列信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 1/20000
    http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 2/2000
    在这里插入图片描述
    基于插件的延迟与基于死信队列的结果恰好相反更符合预期,因此在实际项目中通常采用延迟插件方式来实现rabbitMQ的延迟队列

  • 相关阅读:
    Android---App 崩溃
    LeetCode 958. 二叉树的完全性校验
    ubuntu访问github慢
    c++中用opengl的gl函数在三维空间中绘制圆形和画球体
    CANN算子:利用迭代器高效实现Tensor数据切割分块处理
    嵌入式函数调用入栈与出栈
    数据分析可视化常用图介绍以及相关代码实现(箱型图、Q-Q图、Kde图、线性回归图、热力图)
    2023年9月 青少年软件编程等级考试Scratch四级真题,含答案解析
    小县城蔬菜配送小程序制作全攻略
    前端使用 Konva 实现可视化设计器(5)- 磁贴效果
  • 原文地址:https://blog.csdn.net/qq_41596346/article/details/136199289