• Spring-RabbitMQ 队列长度限制实践


    Springboot 版本: 2.7.0

    超出队列限制后会发生什么?

    1. 丢弃旧消息:如果没有配置关联死信队列,则丢弃最老的消息。
    2. 将旧消息路由到死信队列:如果配置有关联的死信队列,则将最老的消息路由到死信队列。
    3. 拒绝新消息入队:如果默认行为不满足需求,可以通过参数 overflow 进行修改。
      • reject-publish:拒绝最新的消息发布。如果生成者配置有消息确认,那么broker会异步通知生产者消息发送失败。
      • reject-publish-dlx:除了同reject-publish相同的功能外,还会拒绝死信消息。

    怎么设置队列长度?

    有两种方式:

    1. 服务端通过policy设置
    2. 客户端在队列声明时使用队列的可选参数进行配置

    如果服务端和客户端都做了设置,那么以二者中的小值为准。

    服务端通过policy设置

    命令行配置

    配置命令:

    rabbitmqctl set_policy my-pol "^myQueue$" '{"max-length":5, "max-length-bytes":1048576, "overflow":"reject-publish"}'   --apply-to queues --vhost my_vhost
    
    • 1
    1. name: my-pol
    2. pattern: ^myQueue$
    3. definition:
      1. max-length: 5; 最多包含5个消息
      2. max-length-bytes:1048576; 最多包含1MiB的消息数据
      3. overflow:reject-publish; 超出限制后直接拒绝新的消息入队

    配置结果:

    Spring-RabbitMQ 队列长度实践
    Spring-RabbitMQ 队列长度实践

    管理页面配置

    填写内容和命令行是一样的,其结果也是一样的。
    Spring-RabbitMQ 队列长度限制实践

    客户端申明队列时配置

        @Bean
        public Queue queue() {
    
            // 常规队列与死信交换机的绑定关系
            Map<String, Object> queueParams = new HashMap<>(2);
            //设置队列长度为5
            queueParams.put("x-max-length", 5);
            queueParams.put("max-length-bytes", 1048576);
    
            return new Queue(QUEUE_NAME, true, false, false, queueParams);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    代码实践

    只限制消息长度(丢弃旧消息)

    配置文件

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
        listener:
          type: simple
          simple:
            default-requeue-rejected: false
            acknowledge-mode: MANUAL
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    配置类

    定义交换机、队列以及他们之间的绑定关系,并开启生产者消息确认。

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
    
       public final static String TOPIC_EXCHANGE = "myExchange";
    
       public final static String QUEUE_NAME = "myQueue";
    
    
       @Bean
       public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
          return new RabbitAdmin(connectionFactory);
       }
    
    
       @Bean
       public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
          RabbitTemplate template = new RabbitTemplate(connectionFactory);
          template.setMessageConverter(jsonConverter());
          template.setExchange(TOPIC_EXCHANGE);
          template.setConfirmCallback((correlationData, ack, cause) -> {
             if (ack) {
                log.info("消息:{}发送成功", correlationData.getId());
             } else {
                log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
             }
          });
    
          template.setMandatory(true);
          template.setReturnsCallback(returned -> {
             log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
          });
          return template;
       }
    
       // 申明一个常规的交换机
       @Bean
       public TopicExchange topicExchange() {
          return new TopicExchange(TOPIC_EXCHANGE, true, false);
       }
    
    
       // 申明一个常规使用的队列
       @Bean
       public Queue queue() {
    
          // 常规队列与死信交换机的绑定关系
          Map<String, Object> queueParams = new HashMap<>(2);
          //设置队列长度为5
          queueParams.put("x-max-length", 5);
          queueParams.put("x-max-length-bytes", 1048576);
          return new Queue(QUEUE_NAME, true, false, false, queueParams);
       }
    
       @Bean
       public Binding binding() {
          return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
       }
    
       @Bean
       public Jackson2JsonMessageConverter jsonConverter() {
          return new Jackson2JsonMessageConverter();
       }
       
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    生产者

    连续发布10个消息。

    @Component
    public class Publisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(){
    
            for (int i = 0; i < 10; i++) {
                User user = new User("kleven", 18, i+1);
                rabbitTemplate.convertAndSend("my.test.message", user, new CorrelationData(user.getId().toString()));
            }
        }
    
    }
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class User implements Serializable {
       private static final long serialVersionUID = -5079682733940745661L;
    
       private String name;
       private Integer age;
       private Integer id;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = {App.class})
    public class QueueLengthTest {
    
        @Autowired
        private Publisher publisher;
    
    
        @Test
        public void testSend(){
            publisher.send();
            try {
                Thread.sleep(10_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    测试结果

    10个消息都发布成功,但是队列中只有后5个消息,前5个消息被丢弃。
    Spring-RabbitMQ 队列长度限制实践
    Spring-RabbitMQ 队列长度限制实践

    限制消息长度,并配置死信队列(将旧消息路由到死信队列)

    修改配置类增加死信队列,其他保持不变。

    配置类

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
    
        public final static String TOPIC_EXCHANGE = "myExchange";
    
        public final static String QUEUE_NAME = "myQueue";
    
        public final static String DEAD_EXCHANGE = "myDeadExchange";
    
        public final static String DEAD_QUEUE = "myDeadQueue";
    
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(jsonConverter());
            template.setExchange(TOPIC_EXCHANGE);
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息:{}发送成功", correlationData.getId());
                } else {
                    log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
                }
            });
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
            });
            return template;
        }
    
        // 申明一个常规的交换机
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, true, false);
        }
    
    
    
        // 申明一个常规使用的队列
        @Bean
        public Queue queue() {
    
            // 常规队列与死信交换机的绑定关系
            Map<String, Object> queueParams = new HashMap<>(4);
            queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            queueParams.put("x-dead-letter-routing-key","my.dead.letter");
            //设置队列长度为5
            queueParams.put("x-max-length", 5);
            queueParams.put("x-max-length-bytes", 1048576);
            return new Queue(QUEUE_NAME, true, false, false, queueParams);
        }
    
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    
        @Bean
        public Jackson2JsonMessageConverter jsonConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
    
    
        // 申明一个死信交换机
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(DEAD_EXCHANGE, true, false);
        }
    
    
        // 申明一个死信队列
        @Bean
        public Queue deadQueue() {
            return new Queue(DEAD_QUEUE);
        }
    
    
        // 绑定死信交换机和死信队列
        @Bean
        public Binding deadBinding() {
            return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("my.dead.letter");
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    测试结果

    10个消息均发布成功,前5个旧的消息进入死信队列,后5个消息在常规业务队列。

    Spring-RabbitMQ 队列长度限制实践
    Spring-RabbitMQ 队列长度限制实践

    限制消息长度,并配置 overflow (拒绝新消息入队)

    修改配置类增加overflow配置,增加一个消费者,其他保持不变。

    配置类

        @Bean
        public Queue queue() {
    
            // 常规队列与死信交换机的绑定关系
            Map<String, Object> queueParams = new HashMap<>(5);
            queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            queueParams.put("x-dead-letter-routing-key","my.dead.letter");
            //设置队列长度为5
            queueParams.put("x-max-length", 5);
            queueParams.put("x-max-length-bytes", 1048576);
            queueParams.put("x-overflow", "reject-publish");
            return new Queue(QUEUE_NAME, true, false, false, queueParams);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者

    @Slf4j
    @Component
    public class Consumer {
    
    
        @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter")
        public void normalConsumer(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
            // 假设消费者消费一条消息需要2s
            Thread.sleep(2_1000);
            log.info("正常消费者消费 -> {}", user);
            channel.basicAck(deliveryTag, false);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    测试结果

    前6个消息发送成功;后4个消息因为被拒绝所以发送失败。
    Spring-RabbitMQ 队列长度限制实践
    Spring-RabbitMQ 队列长度限制实践

    为什么限制的长度是5却有6个消息发送成功呢?
    原因是队列长度(及所占字节数)限制只针对Ready状态的消息,有上图可知,因为我们这次加了一个消费者,其正在消费一个消息但还没有确认,所以有一个消息的状态是Unacked。

  • 相关阅读:
    Ajax获取JSON数据失败是为什么
    4、模板(二叉树,红黑树,STL的实现)
    Vue插槽slot详解
    全国的科技创新情况数据分享,涵盖2020-2022年三年情况
    java抽象类
    外贸爬虫系统
    凸印的印刷原理及工艺介绍
    以训辅教,以战促学 | 新版攻防世界平台正式上线运营!
    python笔记_程序流程控制
    Ubuntu22.04启用root账户 2208120941
  • 原文地址:https://blog.csdn.net/u012359704/article/details/126514226