• (18)不重启服务动态停止、启动RabbitMQ消费者


            我们在消费RabbitMQ消息的过程中,有时候可能会想先暂停消费一段时间,然后过段时间再启动消费者,这个需求怎么实现呢?我们可以借助RabbitListenerEndpointRegistry这个类来实现,它的全类名是org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry,通过这个类可以实现全部队列消息的启动、停止消费,也可以实现指定队列消息的启动、停止消费。具体的原因感兴趣的话可以参考一下我前面的这篇博客(17)不重启服务动态调整RabbitMQ消费者数量,里面有相应的源码分析

    停止、启动全部队列消费

            RabbitListenerEndpointRegistry类提供了start()方法和stop()方法,可以看到底层都是通过调用getListenerContainers()获取到所有队列的消费监听容器列表,然后遍历挨个调用对应的start()方法和stop()方法。

    1. @Override
    2. public void start() {
    3. for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    4. startIfNecessary(listenerContainer);
    5. }
    6. }
    7. @Override
    8. public void stop() {
    9. for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    10. listenerContainer.stop();
    11. }
    12. }

            我们只需要获取到RabbitListenerEndpointRegistry对象,然后调用其start()方法和stop()方法即可实现启动/停止所有队列消费。

            实现代码如下所示:

    1. @Resource
    2. RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    3. @RequestMapping(value = "/startStopAllConsumer")
    4. @ApiOperation(value = "启动/暂停全部队列消息消费")
    5. public Response startStopAllConsumer(@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {
    6. log.info("启动/暂停全部队列消息消费,consumeSwitch:{}",consumeSwitch);
    7. if(consumeSwitch){
    8. rabbitListenerEndpointRegistry.start();
    9. }else {
    10. rabbitListenerEndpointRegistry.stop();
    11. }
    12. return Response.success();
    13. }

            传入开关参数为false,会停止所有队列消费者消费,调用后控制台看到如下日志

    2023-09-04 19:43:11.480 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  c.b.t.m.p.w.PayCashierMockController:67 - 启动/暂停全部队列消息消费,consumeSwitch:false
    2023-09-04 19:43:11.556 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
    2023-09-04 19:43:12.352 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
    可以看到消息监听容器关闭的日志,然后再传入开关参数为true,调用后会启动所有队列消息消费。

    停止、启动指定队列消费

            上面提到了RabbitListenerEndpointRegistry.getListenerContainers()可以获取到所有队列的消费监听容器列表,我们可以使用MessageListenerContainer中获取消费的队列名进行判断,以实现指定队列的停止、启动消费。

            实现代码如下所示:

    1. @Resource
    2. RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    3. @RequestMapping(value = "/startStopConsumer")
    4. @ApiOperation(value = "启动/暂停指定队列消息消费")
    5. public Response startStopConsumer(@RequestParam(value = "queueName", required = false) String queueName,
    6. @RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {
    7. log.info("启动/暂停指定队列消息消费,consumeSwitch:{},queueName:{}",consumeSwitch,queueName);
    8. //获取所有消息监听容器
    9. Collection listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
    10. for (MessageListenerContainer container : listenerContainers) {
    11. SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
    12. //消息监听容器要消费的队列名称集合
    13. List queueNamesList = Arrays.asList(con.getQueueNames());
    14. //判断容器中的队列名称是否包含需要调整的队列名参数
    15. if (queueNamesList.contains(queueName)) {
    16. if(consumeSwitch){
    17. con.start();
    18. }else{
    19. con.stop();
    20. }
    21. }
    22. }
    23. return Response.success();
    24. }

    传入开关参数为false,停止pay_work_notify队列消费者消费,调用后控制台看到如下日志

    2023-09-04 19:51:37.130 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  c.b.t.m.p.w.PayCashierMockController:80 - 启动/暂停指定队列消息消费,consumeSwitch:false,queueName:pay_work_notify
    2023-09-04 19:51:37.200 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
    2023-09-04 19:51:37.903 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
    可以看到消息监听容器关闭的日志,然后再传入开关参数为true,调用后会启动pay_work_notify队列消息消费。

  • 相关阅读:
    Cadence 16.6 PCB Edito如何将鼠标中键反向拉拽改为正向拖拽
    售后处置跟踪系统设想
    根号2是无理数的两种证明以及如何计算根号2的值
    vue监听div的高度变化
    北斗三号短报文终端露天矿山高边坡监测方案
    nn.functional.normalize
    java 8 新特性
    Linux命令(126)之help
    矢量场的旋度和散度
    Prometheus+Grafana监控
  • 原文地址:https://blog.csdn.net/u012988901/article/details/132675757