注意:XX模式只是一种设计思路,并不是指的具体的某种实现,可以理解为实现XX模式需要怎么去写。
前面我们了解了最简的一个消费者一个生产者的模式,接着我们来了解一下一个生产者多个消费者的情况:
实际上这种模式就非常适合多个工人等待新的任务到来的场景,我们的任务有很多个,一个一个丢进消息队列,而此时工人有很多个,那么我们就可以将这些任务分配个各个工人,让他们各自负责一些任务,并且做的快的工人还可以做完成一些(能者多劳)。
非常简单,我们只需要创建两个监听器即可:
- @Component
- public class TestListener {
- @RabbitListener(queues = "yyds")
- public void receiver(String data){ //这里直接接收String类型的数据
- System.out.println("一号消息队列监听器 "+data);
- }
-
- @RabbitListener(queues = "yyds")
- public void receiver2(String data){
- System.out.println("二号消息队列监听器 "+data);
- }
- }
可以看到我们发送消息时,会自动进行轮询分发:
那么如果我们一开始就在消息队列中放入一部分消息在开启消费者呢?
可以看到,如果是一开始就存在消息,会被一个消费者一次性全部消耗,这是因为我们没有对消费者的Prefetch count(预获取数量,一次性获取消息的最大数量)进行限制,也就是说我们现在希望的是消费者一次只能拿一个消息,而不是将所有的消息全部都获取。
因此我们需要对这个数量进行一些配置,这里我们需要在配置类中定义一个自定义的ListenerContainerFactory,可以在这里设定消费者Channel的PrefetchCount的大小:
- @Resource
- private CachingConnectionFactory connectionFactory;
-
- @Bean(name = "listenerContainer")
- public SimpleRabbitListenerContainerFactory listenerContainer(){
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setPrefetchCount(1); //将PrefetchCount设定为1表示一次只能取一个
- return factory;
- }
接着我们在监听器这边指定即可:
- @Component
- public class TestListener {
- @RabbitListener(queues = "yyds", containerFactory = "listenerContainer")
- public void receiver(String data){
- System.out.println("一号消息队列监听器 "+data);
- }
-
- @RabbitListener(queues = "yyds", containerFactory = "listenerContainer")
- public void receiver2(String data){
- System.out.println("二号消息队列监听器 "+data);
- }
- }
现在我们再次启动服务器,可以看到PrefetchCount被限定为1了:
再次重复上述的实现,可以看到消息不会被一号消费者给全部抢走了:
当然除了去定义两个相同的监听器之外,我们也可以直接在注解中定义,比如我们现在需要10个同样的消费者:
- @Component
- public class TestListener {
- @RabbitListener(queues = "yyds", containerFactory = "listenerContainer", concurrency = "10")
- public void receiver(String data){
- System.out.println("一号消息队列监听器 "+data);
- }
- }
可以看到在管理页面中出现了10个消费者:
至此,有关工作队列模式就讲到这里。