• RabbitMQ - 消息堆积问题的最佳解决方案?惰性队列


    目录

    一、惰性队列

    1.1、消息堆积问题

    1.2、消息堆积问题的解决方法

    从消费者的角度:

    从队列的角度:

    1.3、引入惰性队列

    1.3.1、什么是惰性队列

    1.3.2、惰性队列的使用

    1.3.3、效果演示


    一、惰性队列


    1.1、消息堆积问题

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息到达上限.

    这就像是有一个蓄水池,一边往里注水,一边向外排水,但是注水速度比排水快,因此这个水池最终就会填满.

    接着,最早收到的消息,就可能成为死信,默认情况下会把死信丢弃,丢弃了之后队列就有了空间,就可以新消息进入队列.

    1.2、消息堆积问题的解决方法

    从消费者的角度:

    1. 增加更多的消费者,调高消费速度.

    如果说现在只有单个消费者,处理速度是 100ms 一个消息,那么如果有三个消费者,理论上就处理速度就是 300ms 一个消息.

    2.在消费者内开启一个线程池,每当一个消息过来了,就交给一个线程去处理,那么当前这个消费者内部处理消息的速度就加快了.

    但是,这个方法有一个限制,就是如果消息特别多的情况下,你是不是就需要分配很多线程.  线程越多,对 cpu 来讲也是一种浪费,因为 cpu 需要在多个线程之间做上下文切换,所以这个方案比较适合消息处理业务比较耗时的情况,开多个线程,让 cpu 并行处理.

    从队列的角度:

    3. 扩大队列容积,提高堆积上限.

    就像一个蓄水池,我把这个水池搞的像黄河那么大,你再往里进水,啥时候才能填满呢~

    前两种方法我们已经是比较的熟悉的了,但是最后扩大队列容积,具体该如何实现呢?

    惰性队列就可以用来解决这个问题~

    1.3、引入惰性队列

    1.3.1、什么是惰性队列

    对于传统的队列来讲,如果没有开启消息持久化,所有接收到的消息都是放在内存里面的,目的就是为了加快消息投递的速度,这也是 RabbitMQ 的一个很优势——响应速度快.

    但是他也带来了一个问题,RabbitMQ 设置了一个内存预警值(内存存储的上限,默认是 40%),如果在消息堆积的情况下,很容易就到达这个预警值,此时,RabbitMQ 就会处于一个暂停状态,会阻止生产者投递消息进来,然后把内存中的一部分消息清理出来,刷出到磁盘中,这动作也叫 “page out” .  进而导致 mq 的并发能力,忽高忽低,性能不稳定(每次page out 都会比较耗时,停顿一段时间).

    而惰性队列,就是是专门用来处理消息堆积问题的~

    他有以下三个特点:

    1. 收到的消息会直接写入磁盘,而非内存,也因此很难触发 mq 的内存预警,几乎不会出现 page out 的情况.

    2. 消费者消费惰性队列的消息,也需要先从磁盘中读取并加载到内存中.  实际上这也会使延迟稍微高一点,毕竟磁盘的性能和内存还是有很大差距的,但是也在可以接收到范围内.

    3. 支持数百万的消息存储,这也是因为他是磁盘存储(空间非常大).

    1.3.2、惰性队列的使用

    设置一个队列为 惰性队列,有以下三种方式:

    1. 在 RabbitMQ Managerment 中,我们只需要在声明队列的时候,指定 x-queue-mode 属性为 lazy 即可.

    2. 用 SpringAMQP 声明惰性队列,@Bean 方式如下:

    1. @Configuration
    2. public class LazyConfig {
    3. @Bean
    4. public Queue lazyQueue() {
    5. return QueueBuilder
    6. .durable("lazy.queue")
    7. .lazy() //开启 x-queue-mode 属性为 lazy
    8. .build();
    9. }
    10. }

    3. 用 SpringAMQP 声明惰性队列,注解方式如下:

    1. @RabbitListener(queuesToDeclare = @Queue(
    2. name = "lazy.queue",
    3. durable = "true",
    4. arguments = @Argument(name = "x-queue-mode", value = "lazy")
    5. ))
    6. public void listenLazyQueue(String msg) {
    7. log.info("消费者接收到了惰性队列的消息! msg=" + msg);
    8. }

    1.3.3、效果演示

    a)使用 @Bean 声明一个普通队列和一个惰性队列,来对比效果.

    1. @Configuration
    2. public class LazyConfig {
    3. @Bean
    4. public Queue lazyQueue() {
    5. return QueueBuilder
    6. .durable("lazy.queue")
    7. .lazy() //开启 x-queue-mode 属性为 lazy
    8. .build();
    9. }
    10. @Bean
    11. public Queue normalQueue() {
    12. return QueueBuilder
    13. .durable("normal.queue")
    14. .build();
    15. }
    16. }

    b)使用两个生产者,分别向两个队列中发送 100000 消息.

    1. @Test
    2. public void testLazyMessage() {
    3. for(int i = 0; i < 1000000; i++) {
    4. //1.准备消息
    5. Message message = MessageBuilder.withBody("hello lazy message".getBytes())
    6. .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
    7. .build();
    8. //2.发送消息
    9. rabbitTemplate.convertAndSend("lazy.queue", message);
    10. }
    11. }
    12. @Test
    13. public void testNormalMessage() {
    14. for(int i = 0; i < 1000000; i++) {
    15. //1.准备消息
    16. Message message = MessageBuilder.withBody("hello normal message".getBytes())
    17. .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
    18. .build();
    19. //2.发送消息
    20. rabbitTemplate.convertAndSend("normal.queue", message);
    21. }
    22. }

    普通队列情况如下:

    普通队列每次往 memory 中写入一定量的数据后,暂停一段时间,向磁盘写数据(page out).

    lazy 队列情况如下:

    lazy 一直都往磁盘上写数据,所以 total 等于 paged out.

  • 相关阅读:
    MySQL——数据库基础
    Lighting - 虚幻中的进阶灯光(一)
    yolo数据集的制作教程之海绵宝宝数据集的制作
    linux系统docker的使用命令
    【不正经操作】百度深度学习框架paddlepaddle本地运行-Python环境配置笔记
    C会区块链论文速读-TrustCom 2023(5/6)大语言模型和区块链如何结合?
    python经典百题之皮球掉落
    Debian 决定允许无记名投票,候选人Felix Lechner质疑
    设计模式学习笔记 - 单例设计模式
    【推荐系统中的Hash 2】局部敏感哈希(利用冲突)
  • 原文地址:https://blog.csdn.net/CYK_byte/article/details/133122061