• RabbitMQ(四) | 惰性队列 - 解决消息堆积问题



    接上一篇:RabbitMQ(三) | 死信交换机、死信队列、TTL、延迟队列(安装DelayExchange插件)


    1.消息堆积问题

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
    在这里插入图片描述

    解决消息堆积有两种思路:

    • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
    • 给消费者开启多线程,提高消费速度
    • 扩大队列容积,提高堆积上限

    要提升队列容积,把消息保存在内存中显然是不行的。


    2.惰性队列

    从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储

    2.1.基于命令行设置lazy-queue

    而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

    rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
    
    • 1

    命令解读:

    • rabbitmqctl :RabbitMQ的命令行工具
    • set_policy :添加一个策略
    • Lazy :策略名称,可以自定义
    • "^lazy-queue$" :用正则表达式匹配队列的名字
    • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
    • --apply-to queues :策略的作用对象,是所有的队列

    2.2.基于@Bean声明lazy-queue

    在这里插入图片描述

    在consumer服务端的新增惰性队列配置类LazyConfig:

    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /** 惰性队列配置 */
    @Configuration
    public class LazyConfig {
    
        /** 惰性队列 - 增加了.lazy()属性 */
        @Bean
        public Queue lazyQueue() {
            return QueueBuilder.durable("lazy.queue")
                    .lazy() // 开启x-queue-mode为lazy
                    .build();
        }
    
        /** 正常队列 */
        @Bean
        public Queue normalQueue() {
            return QueueBuilder.durable("normal.queue")
                    .build();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.3.基于@RabbitListener声明LazyQueue

    在这里插入图片描述
    在consumer服务端的消息监听类中新增方法

    @RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void listenLazyQueue(String msg) {
        log.info("接收到 lazy.queue 的消息:{}", msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.4.发送消息

    这里对正常队列和惰性队列进行十万条数据的测试,启动consumer服务,在publisher服务的SpringAmqpTest类中新增下列两个方法:

    /** 测试惰性队列 */
    @Test
    public void testLazyQueue() throws InterruptedException {
        // 模拟发送十万条数据
        long b = System.nanoTime();
        for (int i = 0; i < 1000000; i++) {
            // 1.准备消息
            Message message = MessageBuilder
                    .withBody("hello, LazyQueue".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 改成非持久化,可以看一下LazyQueue的效果
                    .build();
            // 2.发送消息
            rabbitTemplate.convertAndSend("lazy.queue", message);
        }
        long e = System.nanoTime();
        System.out.println(e - b);
    }
    
    /** 测试正常队列 */
    @Test
    public void testNormalQueue() throws InterruptedException {
        // 模拟发送十万条数据
        long b = System.nanoTime();
        for (int i = 0; i < 1000000; i++) {
            // 1.准备消息
            Message message = MessageBuilder
                    .withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 改成非持久化,可以看一下正常队列的效果
                    .build();
            // 2.发送消息
            rabbitTemplate.convertAndSend("normal.queue", message);
        }
        long e = System.nanoTime();
        System.out.println(e - b);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    RabbitMQ控制台队列的数据变化:

    初始:
    在这里插入图片描述

    运行两个生产者方法后,看一下两个队列总览

    在这里插入图片描述


    惰性队列数据变化

    全部一进来就会存储到磁盘,内存中只有需要消费的:
    在这里插入图片描述


    正常队列数据变化

    而正常队列都在内存中(这里是没有开启持久化的情况):
    在这里插入图片描述


    3.总结

    消息堆积问题的解决方案?

    • 队列上绑定多个消费者,提高消费速度
    • 给消费者开启多线程,提高消费速度
    • 使用惰性队列,可以在mq中保存更多消息

    惰性队列的优点有哪些?

    • 基于磁盘存储,消息上限高
    • 没有间歇性的page-out,性能比较稳定

    惰性队列的缺点有哪些?

    • 基于磁盘存储,消息时效性会降低
    • 性能受限于磁盘的IO

    下一篇:RabbitMQ(五) | MQ集群搭建、部署、仲裁队列、集群扩容


  • 相关阅读:
    设计通用流程和可变点的方法一些思考
    LInux驱动开发笔记(十)SPI子系统及其驱动
    享元模式 & 基于享元模式的对象池设计与开发应用(设计模式与开发实践 P12)
    c#设计模式-结构型模式 之装饰者模式
    AD入门学习—元件库的创建2
    【传统的网页布局的三种方式】浮动和定位布局
    计算机视觉+人工智能面试笔试总结——ONNX模型
    图像的SIFT特征点提取
    NMS原理及其代码实现
    vue顶部页面加载进度条
  • 原文地址:https://blog.csdn.net/qq_25112523/article/details/125431996