• Spring-RabbitMQ 工作队列实践


    Springboot 版本: 2.7.0

    介绍

    工作队列可以将耗时任务分配给多个工作者(或消费者)。其背后的主要思想为避免立即执行资源密集型任务并等待其结果,相反的,我们应该让任务异步执行。
    Spring-RabbitMQ 工作队列实践

    我们可以将任务封装成消息发送到工作队列,那么在后台运行的工作者就可以获取到消息也就是获取到任务,然后去执行任务。 如果后台有多个工作者,那么这些工作者们将共享任务列表,共同完成这些任务。

    代码实现

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
        listener:
          type: simple
          simple:
            default-requeue-rejected: false
            acknowledge-mode: MANUAL
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    配置类:用于生成交换机和队列等基本Java Bean。

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
    
        public final static String TOPIC_EXCHANGE = "myExchange";
    
        public final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(jsonConverter());
            template.setExchange(TOPIC_EXCHANGE);
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (!ack) {
                    log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
                }
            });
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
            });
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, true, false);
        }
    
    
        @Bean
        public Queue queue() {
    
            return new Queue(QUEUE_NAME, true, false, false);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    
        @Bean
        public Jackson2JsonMessageConverter jsonConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    生产者:连续生成10个消息。

    @Component
    public class Publisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("my.test.message", new User("kleven", 18, i + 1), new CorrelationData());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class User implements Serializable {
        private static final long serialVersionUID = -5079682733940745661L;
    
        private String name;
        private Integer age;
        private Integer id;
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    平均分配(轮询模式)

    轮询模式:消息会被平均分配给所有的消费者。

    @Slf4j
    @Component
    public class Worker {
    
    
        @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter")
        public void worker1(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
            log.info("worker1消费 -> {}", user);
            channel.basicAck(deliveryTag, false);
        }
    
        @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter")
        public void worker2(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws InterruptedException, IOException {
            // 假设worker2每次消费需要花费5s
            Thread.sleep(5_000);
            log.info("worker2消费 -> {}", user);
            channel.basicAck(deliveryTag, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    结果:

     worker1消费 -> User(name=kleven, age=18, id=1)
     worker1消费 -> User(name=kleven, age=18, id=3)
     worker1消费 -> User(name=kleven, age=18, id=5)
     worker1消费 -> User(name=kleven, age=18, id=7)
     worker1消费 -> User(name=kleven, age=18, id=9)
     worker2消费 -> User(name=kleven, age=18, id=2)
     worker2消费 -> User(name=kleven, age=18, id=4)
     worker2消费 -> User(name=kleven, age=18, id=6)
     worker2消费 -> User(name=kleven, age=18, id=8)
     worker2消费 -> User(name=kleven, age=18, id=10)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    虽然worker2执行缓慢,但依然被分配了一半的任务。这样实际上是拖慢了整体的执行速度,正常的逻辑应该是执行速度快的工作者应该被分配到更多的任务。

    非平均分配(能者多劳)

    为了让速度快的的工作者分配到更多的任务,可以使用非平均分配。最简单的实现方式是设置prefetch=1(channel.basicQos(1)),即信道中只允许一条未确认的消息,那么消费快的工作者确认消息后马上就可以被分配下一个任务,而效率低的工作者由于信道中的消息还没有确认所以不能被分配任务,这样就实现了最基本的非平均分配,即能者多劳。

    Spring-RabbitMQ 工作队列实践

    修改后的配置文件:只需将prefetch设置为1,其他代码不变。

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
        listener:
          type: simple
          simple:
            default-requeue-rejected: false
            acknowledge-mode: MANUAL
            prefetch: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    结果:效率高的worker1被分配了9个任务,效率低的worker仅仅只被分配了1个任务。

    worker1消费 -> User(name=kleven, age=18, id=1)
    worker1消费 -> User(name=kleven, age=18, id=3)
    worker1消费 -> User(name=kleven, age=18, id=4)
    worker1消费 -> User(name=kleven, age=18, id=5)
    worker1消费 -> User(name=kleven, age=18, id=6)
    worker1消费 -> User(name=kleven, age=18, id=7)
    worker1消费 -> User(name=kleven, age=18, id=9)
    worker1消费 -> User(name=kleven, age=18, id=8)
    worker1消费 -> User(name=kleven, age=18, id=10)
    worker2消费 -> User(name=kleven, age=18, id=2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    Java如何实现统计在线人数的功能?
    bootz启动 Linux内核过程中涉及的 do_bootm_states 函数
    保险业SAP转型:奠定坚实的基础
    Elasticsearch simple(2)ElasticSearch windows本地环境搭建(V8.5.1)
    语音相似度评价
    Echarts —自定义label标签的样式(formatter,rich,添加图标等操作)
    ES6解析赋值
    攻防世界题目练习——Web引导模式(二)
    八股文复习
    k8s入门:kubernetes-dashboard 安装
  • 原文地址:https://blog.csdn.net/u012359704/article/details/126474836