• 【RabbitMQ】SpringBoot整合RabbitMQ实现延时队列


    【RabbitMQ】SpringBoot整合RabbitMQ实现延时队列

    实现原理

    • 1、什么是死信队列

      • 死信队列:DLX,dead-letter-exchange
      • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
    • 2、消息变成死信有以下几种情况

      • 消息被拒绝(basic.reject / basic.nack),并且requeue = false

      • 消息TTL过期

      • 队列达到最大长度

    • 3、死信处理过程

      • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
      • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
      • 可以监听这个队列中的消息做相应的处理。
    • 4、死信队列实现延时消息原理

      • 我们将消息发送到消息队列中,并设置一个过期时间,该队列没有消费者
      • 消息的过期时间到了之后,由于没有消费者,就会进入死信队列
      • 我们用一个消费者接收死信队列的消息,就能达到延迟消息的目的

    image-20220917010553091

    实现步骤

    • 1、引入 maven 依赖
    <!-- rabbitmq消息队列 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 2、配置 MQ 连接信息
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 3、创建队列,并绑定交换机
      • 分别创建了私信队列和普通消息队列,普通消息队列设置了消息过期时间为 10s,设置了死信交换机、死信 routing_key,消息过期之后就能进入了死信队列中
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 死信队列
         */
        public static final String DLX_QUEUE = "dlx_queue";
        public static final String DLX_EXCHANGE = "dlx_exchange";
        public static final String DLX_ROUTING_KEY = "dlx_routing_key";
    
        /**
         * 正常队列
         */
        public static final String MSG_QUEUE = "msg_queue";
        public static final String MSG_EXCHANGE = "msg_exchange";
        public static final String MSG_ROUTING_KEY = "msg_routing_key";
    
        /**
         * 死信队列
         *
         * @return
         */
        @Bean
        Queue dlxQueue() {
            return new Queue(DLX_QUEUE, true, false, false);
        }
    
        /**
         * 死信交换机
         *
         * @return
         */
        @Bean
        DirectExchange dlxExchange() {
            return new DirectExchange(DLX_EXCHANGE, true, false);
        }
    
        /**
         * 绑定死信队列和死信交换机
         *
         * @return
         */
        @Bean
        Binding dlxBinding() {
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                    .with(DLX_ROUTING_KEY);
        }
    
        /**
         * 普通消息队列
         *
         * @return
         */
        @Bean
        Queue msgQueue() {
            Map<String, Object> args = new HashMap<>();
            //设置消息过期时间
            //1s * 3600 * 24 * 7 = 7day
            args.put("x-message-ttl", 1000 /** 3600 * 24 * 7*/);
            //设置死信交换机
            args.put("x-dead-letter-exchange", DLX_EXCHANGE);
            //设置死信 routing_key
            args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
            return new Queue(MSG_QUEUE, true, false, false, args);
        }
    
        /**
         * 普通交换机
         *
         * @return
         */
        @Bean
        DirectExchange msgExchange() {
            return new DirectExchange(MSG_EXCHANGE, true, false);
        }
    
        /**
         * 绑定普通队列和与之对应的交换机
         *
         * @return
         */
        @Bean
        Binding msgBinding() {
            return BindingBuilder.bind(msgQueue())
                    .to(msgExchange())
                    .with(MSG_ROUTING_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 4、普通队列消息生产者
    /**
     * @description: 消息生产者
     * @author: 刘宇浩
     * @date: 2022/9/16 16:56
     */
    @Slf4j
    @Component
    public class MsgProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public boolean sendMsgForBlackList(String userId) {
            try {
                rabbitTemplate.convertAndSend(RabbitMqConfig.MSG_EXCHANGE, RabbitMqConfig.MSG_ROUTING_KEY, userId);
                log.info("-------------------消息发送成功");
                return true;
            } catch (Exception e) {
                log.error("-------------------消息发送失败");
                return false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 5、死信队列消费者
    /**
     * @description: 消息消费者
     * @author: 刘宇浩
     * @date: 2022/9/16 16:56
     */
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class DlxConsumer {
    
        @RabbitListener(queues = RabbitMqConfig.DLX_QUEUE)
        public void handle(String message) {
            log.info("------------------收到消息:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    测试

    访问接口,这里贴出控制台响应的数据:

    image-20220917011422156

  • 相关阅读:
    【数据结构与算法】不就是数据结构
    一篇文章讲清楚MySQL的聚簇/联合/覆盖索引、回表、索引下推
    vivado产生报告阅读分析6-时序报告2
    Spring Cloud alibaba 集成 nacos 以及整合 Ribbon 与 Feign 实现负载调用(3)
    从0到1学SpringCloud——12 gateway 动态配置网关路由规则
    刷题之路:1196 - 【入门】递归版拐角I题解
    C++基础入门
    Linux中DNS的正向和反向解析
    Go 单元测试之mock接口测试
    [计算机提升] 计算机病毒
  • 原文地址:https://blog.csdn.net/weixin_63566550/article/details/126900137