📝个人主页:五敷有你
🔥系列专栏:MQ
⛺️稳中求进,晒太阳
Work queues任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void testSendMessage2Queue() throws InterruptedException {
- String queueName1 = "work.queue";
-
- for(int i=0;i<50;i++){
- String msg = "Hello Work.Queue 编号:"+i;
- rabbitTemplate.convertAndSend(queueName1, msg);
- Thread.sleep(20);
- }
-
- }
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法
- @RabbitListener(queues = "work.queue")
- public void listenWorkQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到了work.queue的消息:【" + msg +"】");
- Thread.sleep(5);
- }
- @RabbitListener(queues = "work.queue")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者1收到了work.queue的消息:【" + msg +"】");
- Thread.sleep(50);
- }
注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,发现结果如下:.
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
Work模型的使用: