• 【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码


    参考资料

    七、延迟队列

    7.1 什么是延迟队列

    正常的MQ应用场景中,我们希望消息可以快速稳定的传递。但是有一些场景中,希望在指定的延迟后再消费信息,比如订单支付场景(订单15部分内未支付则关闭订单)。

    这类实现延迟任务的场景,就可以采用延迟队列来实现。

    以下介绍一下其他的一些方法。

    7.2 延迟队列的解决方案

    7.2.1 定时任务

    每隔n秒扫描一次数据库,查询数据库装为过期的订单进行处理。

    实现方式

    spring schedule、quartz、xxljob等

    优点

    简单,容易实现;

    缺点

    1. 存在延迟(受定时器延迟时间限制
    2. 性能较差,每次扫描数据库,如果订单量交大,会给数据库造成较大压力。
    7.2.2 被动取消

    当用户主动查询订单时,判断订单是否超时,超时则取消

    • 优点:服务器压力小
    • 缺点:如果用户长时间不查询,则会造成统计异常;而且用户打开订单页面会变慢,严重的话会影响用户体验
    7.2.3 JDK的延迟队列

    DelayedQueue:无界阻塞队列,该队列只有在延迟期满后,才能从中获取元素。

    优点

    实现简单,任务的延迟低。

    缺点

    • 服务器重启宕机,数据会丢失
    • 只适用于单机版
    • 订单量大时,可能会造成内存不足:OOM
    7.2.3 采用消息中间件(rabbitMQ

    RabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递(前面提到的死信队列)。.

    image-20231017141210411

    把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费者可以从DLX的队列中取走消息。

    7.2.3.1 适用专门优化后的死信队列实现延迟队列

    在上面的mq方案中,存在两个不同的交换机,我们可以利用直连交换机的特性,将交换机优化成一个交换机,同时通过不同的routingKey指定普通队列和死信队列。

    image-20231017141445269

    思路解释

    1. 生产者发送消息到交换机X,并指定ttl的key
    2. 消息被交换机传递到ttl队列中(指定了消息过期时间的队列
    3. 同时,ttl队列还指定的死信交换机DLX为自身的交换机X,但是指定的routingKey为死信队列的key
    4. 这样,当消息在ttl队列中到期后,这条消息就会被传递到死信队列中,提供给消费者
    7.2.3.2 ⭐️实例代码

    为了便于测试,将发送和接收写在同一个服务中

    配置信息

    @Configuration
    public class DelayExchangeConfig {
        public static String exchangeName = "order.ttl.exchange";
        public static String orderQ = "order.ttl.queue";
        public static String dlxQ = "order.dlx.queue";
    
        @Bean
        public DirectExchange delayedExchange(){
            return ExchangeBuilder.directExchange(exchangeName).build();
        }
    
        @Bean
        public Queue orderQueue(){
            // 指定该队列的过期时间和死信队列
            Map<String , Object> properties = new HashMap<>();
            properties.put("x-message-ttl" , 15000);
            properties.put("x-dead-letter-exchange" , exchangeName);
            properties.put("x-dead-letter-routing-key" , "dead-letter");
            return QueueBuilder.durable(orderQ).withArguments(properties).build();
        }
    
        @Bean
        public Queue dlxQueue(){
            return QueueBuilder.durable(dlxQ).build();
        }
    
        @Bean
        public Binding dlxBinding1(){
            return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("dead-letter");
        }
    
        @Bean
        public Binding ttlBinding1(){
            return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("order");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    测试代码

    @RestController
    @RequestMapping("/delay")
    @Slf4j
    public class DelayedController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/{msg}")
        public void sentErrorMsg(@PathVariable("msg") String msg) {
            log.info("(延迟队列)准备发送的信息:{} , 路由键 :{}", msg, "order");
            // 发送到普通的延时列表中
            rabbitTemplate.convertAndSend(exchangeName, "order", msg.getBytes(StandardCharsets.UTF_8));
            log.info("(延迟队列)成功发送!发送时间{}" , LocalDateTimeUtil.now());
        }
    
        @RabbitListener(queues = "order.dlx.queue")
        public void receiveDelayedMsg(Message message){
            log.info("(延迟队列)接受到的消息是:{}" , new String(message.getBody()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    7.2.3.2 测试结果

    配置正确

    image-20231017144033384

    控制台打印正确:15秒后接收到的了之前发送的信息

    image-20231017144116843


    7.2.4 使用rabbitmq_delayed_message_exchange插件.
    7.2.4.1 插件下载

    插件下载地址

    • https://www.rabbitmq.com/community-plugins.html
    • https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
      • 根据自己的rabbit版本,我这里用的是3.9
    7.2.4.2 ⭐️如何在docker环境下安装插件

    参考文章:https://juejin.cn/post/7138717546894589966

    1. 将下载到的文件,移动到容器内

      docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
      
      • 1

    image-20231017153230781

    1. 进入容器bash指令,并启动插件

      docker exec -it rabbitmq bash
      
      root@rabbit:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      
      # 使用下面的指令查看插件列表
      rabbitmq-plugins list
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    image-20231017153257970

    进入控制台新建交换机,能查看到新的交换机类型

    image-20231017154024943

    7.2.4.3 ⭐️ 代码示例:如何使用该插件

    官方说明文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#usage

    image-20231017153803323

    理解原理:delay exchange在接受到消息后,会先存在内部数据库中,检查x-delay延迟时间(头部

    image-20231017154940504

    代码使用思路

    1. 要创建自定义的交换机类型,要使用CustomExchange()来创建。几个参数的解释如下:

      • name:rabbit中交换机的名称
      • type:交换机类型 (x-delayed-message)
      • durable:是否持久
      • autoDelete:是否自动删除
      • arguments:参数信息
    2. arguments:参数信息从官方文档中获取

      // ... elided code ...
      Map<String, Object> args = new HashMap<String, Object>();
      args.put("x-delayed-type", "direct");
      channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
      // ... more code ...
      
      • 1
      • 2
      • 3
      • 4
      • 5
    3. 交换机创建好后,只需要创建一条队列即可,并进行绑定

    4. 注意:消息发送需要在头部存放信息headers.put("x-delay", 延迟时间)。不需要使用自带的expiration来控制延迟时间了

    配置类

    @Configuration
    public class DelayPluginConfig {
        public static String exchangeName = "delay-x-plugin.x";
        public static String key = "demo";
        @Bean
        public CustomExchange customExchange(){
            // 参考官方文档,创建插件提供的自定义交换机
            Map args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            // public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map arguments)
            return new CustomExchange(exchangeName, "x-delayed-message" , true , false , args);
        }
    
        @Bean
        public Queue delayDemoQueue(){
            return QueueBuilder.durable("delay-x-plugin.queue.demo").build();
        }
    
        @Bean
        public Binding delayPluginBinding(){
            return BindingBuilder
                    .bind(delayDemoQueue())
                    .to(customExchange())
                    .with(key)
                    .noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    生产者

    @RestController
    @RequestMapping("/delay/plugin")
    @Slf4j
    public class DelayedPluginController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/{delay}/{msg}")
        public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("delay") Long delay) {
            log.info("(延迟插件队列)准备发送的信息:{} ,延迟时间:{} 路由键 :{}", msg, delay , "demo");
            // 在头部设置过期时间
            MessageProperties properties = new MessageProperties();
            properties.setHeader("x-delay", delay);
            Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
            // 发送信息
            rabbitTemplate.convertAndSend(exchangeName, "demo", message);
            log.info("(延迟插件队列)成功发送!发送时间:{}", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()));
        }
    
        @RabbitListener(queues = "delay-x-plugin.queue.demo")
        public void receiveDelayedMsg(Message message) {
            log.info("(延迟插件队列)接受到的消息是:{}", new String(message.getBody()));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    7.2.4.4 测试结果

    生成交换机和队列

    image-20231017160126659image-20231017160147125

    访问路径/delay/plugin/25000/一条25秒过期的信息:查看日志打印:成功

    image-20231017160422203

    7.3 问题:多个消息的延迟时间不同该如何解决?

    由于队列先进先出的特性,如果不同消息的延迟时间不同,一旦出现后进的消息延迟时间小于先进的队列,那么消息过期的时间就会出错。

    7.3.1 解决方案一:用延迟队列区分

    要解决这个问题,就需要将队列的延迟时间统一,将不同的延迟的消息发送到对应延迟的队列中。

    保证队列的延迟时间和消息的延迟时间是一样的即可。

    如下

    image-20231017144817671

    7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

    由于该插件的原理并不是单纯的队列实现,而是使用rabbit内部数据库时间,所以可以很好的解决问题。

    可以进行一个简单测试验证:

    • 先发送一条25秒过期的信息,再发送3条5秒过期的信息

    • 查看结果:正常消费,解决问题

      image-20231017160917110

  • 相关阅读:
    PowerBI真经连续剧
    Pytorch - 使用torchsummary/torchsummaryX/torchinfo库打印模型结构、输出维度和参数信息
    OpenCV C++ 图像 批处理 (批量调整尺寸、批量重命名)
    97. 常用的HTTP服务压测工具
    矩形-圆动画转换动画效果
    【C++】C++11 ——— 可变参数模板
    C/C++基于词频的文件相似度
    对比编程语言的四种错误处理方法,哪种才是最优方案?
    k8s--基础--23.5--认证-授权-准入控制--通过token令牌登陆dashboard界面
    window 下 达梦数据库的备份和还原
  • 原文地址:https://blog.csdn.net/Xcong_Zhu/article/details/133886828