• rabbitMq 针对于当前监听的队列,来控制消费者并发数量,不影响其他队列,代码示例


    @Configuration
    @ConditionalOnClass(SimpleRabbitListenerContainerFactory.class)
    public class ConsumerConfig {
    
    	@Value("${rabbit.batch.num:100}")
    	private int batchNum;
    
    	@Bean("batchQueueRabbitListenerContainerFactory")
    	public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    		factory.setConnectionFactory(connectionFactory);
    		factory.setBatchListener(true);
    		factory.setConsumerBatchEnabled(true);
    		factory.setBatchSize(batchNum);
    		factory.setConcurrentConsumers(5); // 设置并发消费者数量为 5
    		factory.setMaxConcurrentConsumers(10); // 设置最大并发消费者数量为 10
    		return factory;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    concurrentConsumers 和 maxConcurrentConsumers 属性的具体含义如下:
    concurrentConsumers:指定同时运行的消费者数量,默认为1。
    maxConcurrentConsumers:指定允许的最大并发消费者数量,默认为1。
    因此,在上述示例中,设置了 concurrentConsumers 为 5,maxConcurrentConsumers 为 10,意味着 RabbitMQ 容器将维持一个初始的消费者池大小为 5,并在需要时最多扩展到 10 个并发消费者。

    通过以上修改,你就可以在 batchQueueRabbitListenerContainerFactory 中控制消费者的并发数量了。根据你的实际需求,可以调整并发消费者的数量以满足系统性能和资源的要求。
    需要注意的是,这种设置会影响到特定队列的消费者并发数量,而不会影响其他队列的消费者。因为你是针对特定的batchQueueRabbitListenerContainerFactory进行配置,所以只会影响使用该工厂的队列。
    如果你想配置多个工厂,可以继续添加其他的@Bean方法。

    例如,你可以添加另一个SimpleRabbitListenerContainerFactory bean,命名为anotherQueueRabbitListenerContainerFactory,并配置相应的属性:

    @Bean("anotherQueueRabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory anotherQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 配置其他属性
        return factory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    通过这种方式,你可以定义多个SimpleRabbitListenerContainerFactory bean,并分别配置每个工厂需要的属性。然后在需要使用特定工厂的@RabbitListener注解中,通过containerFactory属性指定使用哪个工厂。

    举个例子,如果你希望某个队列使用anotherQueueRabbitListenerContainerFactory工厂进行监听,可以这样设置:

    @RabbitListener(queues = "another.queue", containerFactory = "anotherQueueRabbitListenerContainerFactory")
    public void onAnotherMessage(Message message) {
        // 处理消息
    }
    
    • 1
    • 2
    • 3
    • 4

    通过这种方式,你可以根据需要定义多个工厂,并将它们分配给不同的队列进行监听。
    以下是rabbitMq监听消息代码:

    @RabbitListener(queues = "test.queue", containerFactory = "batchQueueRabbitListenerContainerFactory")
    @RabbitHandler
    public void onReportMessage(List<Message> messages) {
    	List<Map<String, Object>> list = messages.stream().map(message -> (Map<String, Object>) message.getPayload()).collect(Collectors.toList());
    	log.info("report收到数据:{}", JSON.toJSONString(list));
    	service.handler(list);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    小程序源码:独家修复登录接口社区论坛微信小程序源码下载-支持多种发帖模式超强社区
    SpringMvc执行流程
    计算机毕设(附源码)JAVA-SSM基于的图书馆管理系统
    计算机毕业设计Java演出票在线预定网站系统(源码+系统+mysql数据库+Lw文档)
    使用 JMeter 分布式性能测试
    从头训练RNN语言模型,这样的loss正常吗?
    【K8S专栏】什么是Kubernetes
    pcl基于最小切割的点云分割
    【LeetCode刷题-滑动窗口】-- 239.滑动窗口最大值
    Android简易音乐重构MVVM Java版-新增首页最近播放歌曲展示及底部音乐bar+日推功能(十六)
  • 原文地址:https://blog.csdn.net/weixin_44060488/article/details/134446877