• 【RabbitMQ】WorkQueue


           📝个人主页:五敷有你      

     🔥系列专栏:MQ

    ⛺️稳中求进,晒太阳

    Work Queues

    Work queues任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

    当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

    此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

    消息发送

    这次我们循环发送,模拟大量消息堆积现象。

    在publisher服务中的SpringAmqpTest类中添加一个测试方法

    1. @Autowired
    2. private RabbitTemplate rabbitTemplate;
    3. @Test
    4. void testSendMessage2Queue() throws InterruptedException {
    5. String queueName1 = "work.queue";
    6. for(int i=0;i<50;i++){
    7. String msg = "Hello Work.Queue 编号:"+i;
    8. rabbitTemplate.convertAndSend(queueName1, msg);
    9. Thread.sleep(20);
    10. }
    11. }

    消息接收

    要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法

    1. @RabbitListener(queues = "work.queue")
    2. public void listenWorkQueue1(String msg) throws InterruptedException {
    3. System.out.println("消费者1收到了work.queue的消息:【" + msg +"】");
    4. Thread.sleep(5);
    5. }
    6. @RabbitListener(queues = "work.queue")
    7. public void listenWorkQueue2(String msg) throws InterruptedException {
    8. System.err.println("消费者1收到了work.queue的消息:【" + msg +"】");
    9. Thread.sleep(50);
    10. }

    注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

    • 消费者1 sleep了5毫秒,相当于每秒钟处理200个消息
    • 消费者2 sleep了50毫秒,相当于每秒处理20个消息

    消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

    能者多劳

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

    再次测试,发现结果如下:.

    可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

    正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

    总结

    Work模型的使用:

    • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
    • 通过设置prefetch来控制消费者预取的消息数量
  • 相关阅读:
    哈希表题目:两数之和
    基于AI+视频智能分析技术的SkeyeVSS建筑废弃物监管解决方案
    丁鹿学堂:重学js的设计模式,彻底掌握(一)
    【RcoketMQ】RcoketMQ 5.0新特性(二)- Pop消费模式
    华为开发者大会2022:HMS Core 3D建模服务再升级,万物皆可驱动
    94-Java的转换流:解决代码与文件编码不一致读取乱码的问题
    详讲!!红黑树(最优二叉树)
    Taurus.MVC 微服务框架 入门开发教程:项目部署:3、微服务应用程序版本升级:全站升级和局部模块升级。
    C++面向对象OOP-友元与运算符重载
    Linux使用YUM安装程序准备
  • 原文地址:https://blog.csdn.net/m0_62645012/article/details/136547299