• SpringAMQP WorkQueue消息队列模型的理解与使用


    原理分析

    Work Queue,故名思意,工作队列,互相配合工作。适用于消息密集型消息队列的场景,如下图所示,queue中存在着大量的消息,而消费者有续配合工作,消息队列有阅后即焚的特点,所以不会出现重复消费的情况。

    在这里插入图片描述

    工作队列属于一个设计模型的概念,并不是一项复杂的技术,简单来说就是一个消息队列绑定多个消费者。利用SpringAMQP,我们能很容易搭建出一个WorkQueue。

    代码实现

    从上面的图中,我们知道工作队列的设计要素:

    • queue中有大量的消息需要消费
    • 消费者有多个

    基于上面的需求,我们再设计一个场景:

    • 消费者1的处理速度为每秒50条消息
    • 消费者2的处理速度为每秒5条
    • 生产者不限速

    为什么要设计这个场景,一会再分析,我们先简单地搭一个小小的测试Demo。

    生产者:

        @Test
        public void testWorkQueue() {
            String queueName = "work.queue";
            String message = "hello world!___";
            for(int i = 0; i < 50; i++) {
                rabbitTemplate.convertAndSend(queueName,message + i);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消费者1:

    每秒处理50条

        @RabbitListener(queues = "work.queue")
        public void listenWorkQueue1(String msg) throws InterruptedException {
            System.out.println("20 listen:" + msg);
            Thread.sleep(20);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消费者2:

    每秒处理5条

        @RabbitListener(queues = "work.queue")
        public void listenWorkQueue2(String msg) throws InterruptedException {
            System.err.println("200 listen:" + msg);
            Thread.sleep(200);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行结果分析

    在运行前,我们设想一下,一共50条消息,消费者1每秒能处理50条,消费者2每秒能处理5条,那总消费时长会是多少呢?消费者1处理消息多还是消费者2处理的消息多?

    带着这个疑问去运行一下,下面的是我的控制台执行结果

    200 listen:hello world!___0 time:10:50:53.070
    20 listen:hello world!___1 time:10:50:53.070
    20 listen:hello world!___3 time:10:50:53.098
    20 listen:hello world!___5 time:10:50:53.130
    20 listen:hello world!___7 time:10:50:53.165
    20 listen:hello world!___9 time:10:50:53.196
    20 listen:hello world!___11 time:10:50:53.228
    20 listen:hello world!___13 time:10:50:53.251
    20 listen:hello world!___15 time:10:50:53.284
    200 listen:hello world!___2 time:10:50:53.284
    20 listen:hello world!___17 time:10:50:53.316
    20 listen:hello world!___19 time:10:50:53.352
    20 listen:hello world!___21 time:10:50:53.383
    20 listen:hello world!___23 time:10:50:53.415
    20 listen:hello world!___25 time:10:50:53.435
    20 listen:hello world!___27 time:10:50:53.473
    200 listen:hello world!___4 time:10:50:53.489
    20 listen:hello world!___29 time:10:50:53.508
    20 listen:hello world!___31 time:10:50:53.541
    20 listen:hello world!___33 time:10:50:53.577
    20 listen:hello world!___35 time:10:50:53.610
    20 listen:hello world!___37 time:10:50:53.641
    20 listen:hello world!___39 time:10:50:53.671
    200 listen:hello world!___6 time:10:50:53.704
    20 listen:hello world!___41 time:10:50:53.704
    20 listen:hello world!___43 time:10:50:53.736
    20 listen:hello world!___45 time:10:50:53.767
    20 listen:hello world!___47 time:10:50:53.802
    20 listen:hello world!___49 time:10:50:53.835
    200 listen:hello world!___8 time:10:50:53.907
    200 listen:hello world!___10 time:10:50:54.127
    200 listen:hello world!___12 time:10:50:54.335
    200 listen:hello world!___14 time:10:50:54.553
    200 listen:hello world!___16 time:10:50:54.769
    200 listen:hello world!___18 time:10:50:54.983
    200 listen:hello world!___20 time:10:50:55.183
    200 listen:hello world!___22 time:10:50:55.384
    200 listen:hello world!___24 time:10:50:55.586
    200 listen:hello world!___26 time:10:50:55.803
    200 listen:hello world!___28 time:10:50:56.017
    200 listen:hello world!___30 time:10:50:56.227
    200 listen:hello world!___32 time:10:50:56.442
    200 listen:hello world!___34 time:10:50:56.655
    200 listen:hello world!___36 time:10:50:56.872
    200 listen:hello world!___38 time:10:50:57.074
    200 listen:hello world!___40 time:10:50:57.285
    200 listen:hello world!___42 time:10:50:57.491
    200 listen:hello world!___44 time:10:50:57.698
    200 listen:hello world!___46 time:10:50:57.912
    200 listen:hello world!___48 time:10:50:58.118

    结果分析
    回答一下上面的疑问,总消费时长:10:50:58.118 - 10:50:53.070 ,大概是5秒左右,两个线程都分别消费了25条,而且是有序地消费,消费者1的是单数,消费者2的是双数。

    这个结果我们并不满意,消费者1的速度是消费者的速度的10倍,我为啥要把活平分呢?这个与轮询机制效果很相似,但是在消息队列里它不叫轮询,叫做消息预取。消费者1与消费者2会提前获得消息的分配。按照我们理想的状态,消费者1其实自己1秒就能把活给都干了,和消费者2合作之后,反而还加长到了5秒!那我加多一个消费者干嘛?

    解决方案

    针对这个问题,我们会考虑,能不能让这消费者2干不了就别接这么多活?答案是可以的,而且很容易,只需在添加一下配置文件
    application.yml

    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1 # 每次只能获取1条
    
    • 1
    • 2
    • 3
    • 4
    • 5

    prefetch就是预取的意思,默认值是无限,有多少就拿多少,在这里限制它每次只能预取1条,那就可以限制消费者2干不了又揽太多活的现象了。

    再执行一次,查看结果

    200 listen:hello world!___0 time:11:25:50.496
    20 listen:hello world!___1 time:11:25:50.496
    20 listen:hello world!___2 time:11:25:50.527
    20 listen:hello world!___3 time:11:25:50.558
    20 listen:hello world!___4 time:11:25:50.589
    20 listen:hello world!___5 time:11:25:50.622
    20 listen:hello world!___6 time:11:25:50.653
    20 listen:hello world!___7 time:11:25:50.685
    200 listen:hello world!___8 time:11:25:50.708
    20 listen:hello world!___9 time:11:25:50.708
    20 listen:hello world!___10 time:11:25:50.745
    20 listen:hello world!___11 time:11:25:50.774
    20 listen:hello world!___12 time:11:25:50.814
    20 listen:hello world!___13 time:11:25:50.859
    20 listen:hello world!___14 time:11:25:50.890
    20 listen:hello world!___15 time:11:25:50.922
    200 listen:hello world!___16 time:11:25:50.922
    20 listen:hello world!___17 time:11:25:50.945
    20 listen:hello world!___18 time:11:25:50.970
    20 listen:hello world!___19 time:11:25:51.001
    20 listen:hello world!___20 time:11:25:51.035
    20 listen:hello world!___21 time:11:25:51.072
    20 listen:hello world!___22 time:11:25:51.100
    20 listen:hello world!___23 time:11:25:51.135
    200 listen:hello world!___24 time:11:25:51.135
    20 listen:hello world!___25 time:11:25:51.167
    20 listen:hello world!___26 time:11:25:51.198
    20 listen:hello world!___27 time:11:25:51.229
    20 listen:hello world!___28 time:11:25:51.267
    20 listen:hello world!___29 time:11:25:51.298
    20 listen:hello world!___30 time:11:25:51.330
    200 listen:hello world!___31 time:11:25:51.347
    20 listen:hello world!___32 time:11:25:51.376
    20 listen:hello world!___33 time:11:25:51.405
    20 listen:hello world!___34 time:11:25:51.433
    20 listen:hello world!___35 time:11:25:51.459
    20 listen:hello world!___36 time:11:25:51.491
    20 listen:hello world!___37 time:11:25:51.521
    200 listen:hello world!___39 time:11:25:51.553
    20 listen:hello world!___38 time:11:25:51.553
    20 listen:hello world!___40 time:11:25:51.588
    20 listen:hello world!___41 time:11:25:51.620
    20 listen:hello world!___42 time:11:25:51.647
    20 listen:hello world!___43 time:11:25:51.702
    20 listen:hello world!___44 time:11:25:51.725
    20 listen:hello world!___45 time:11:25:51.756
    200 listen:hello world!___46 time:11:25:51.756
    20 listen:hello world!___47 time:11:25:51.789
    20 listen:hello world!___48 time:11:25:51.810
    20 listen:hello world!___49 time:11:25:51.846

    总时长接近1秒执行完了,消费者2也仅仅消费了7条消息。可能有些同学会疑问,怎么还是大于了1秒呢?不是应该在1秒内执行完吗?消费者1每秒能处理50条,但他才消费了33条就大于1秒了!这锅消费者1不背哈,这个是Thread.sleep()的锅,sleep()完并不一定马上就能执行的,只是进入了就绪态,不是执行态,他还要等cpu的调度。

  • 相关阅读:
    BeanShell 如何加密加签?
    CREO:CREO软件之工程图【创建】以及配置(符合国家标准)的简介及其使用方法(图文教程)之详细攻略
    python循环中的continue和break
    WebSocket
    C++ 循环解析
    最短路Dijkstra和最小生成树Prim算法的同异
    springboot+微信小程序的点餐系统(开题报告+论文+答辩PPT+源码)
    微信小程序-npm扩展工具包
    24节气查询易语言代码
    单调栈代码
  • 原文地址:https://blog.csdn.net/interestANd/article/details/128012857