• Spring-RabbitMQ 死信队列实践


    Springboot 版本: 2.7.0

    死信概念

    死信,即无法被正常消费的消息。 正常情况下到达中间件队列中的消息都应该也都会被消费者消费,但如果因为一些原因导致消息没有被消费,那么这个消息就被认为是死信了。

    死信来源

    1. 消息 TTL 过期。
    2. 队列达到最大长度, 新的消息无法再添加到队列中。
    3. 消息被拒绝或者消费失败,且不再重新入队。

    使用场景

    1. 延时队列:正常队列不消费,让消息过期后进入死信队列,消费者从死信队列消费以达到延时效果。
    2. 保证业务数据不丢失:当消费者消费异常时,将异常消息加入到死信队列,由特定程序或人工处理。

    代码实践

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    配置类:配置交换机和队列

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
    
        public final static String TOPIC_EXCHANGE = "myExchange";
    
        public final static String QUEUE_NAME = "myQueue";
    
        public final static String DEAD_EXCHANGE = "myDeadExchange";
    
        public final static String DEAD_QUEUE = "myDeadQueue";
    
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(jsonConverter());
            template.setExchange(TOPIC_EXCHANGE);
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息:{}发送成功", correlationData.getId());
                } else {
                    log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
                }
            });
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
            });
            return template;
        }
    
        // 申明一个常规的交换机
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, true, false);
        }
    
    
    
        // 申明一个常规使用的队列
        @Bean
        public Queue queue() {
    
            // 常规队列与死信交换机的绑定关系
            Map<String, Object> queueParams = new HashMap<>(3);
            queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            queueParams.put("x-dead-letter-routing-key","my.dead.letter");
            //设置队列长度为5
            queueParams.put("x-max-length", 5);
    
            return new Queue(QUEUE_NAME, true, false, false, deadLetterParams);
        }
    
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    
        @Bean
        public Jackson2JsonMessageConverter jsonConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
    
    
        // 申明一个死信交换机
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(DEAD_EXCHANGE, true, false);
        }
    
    
        // 申明一个死信队列
        @Bean
        public Queue deadQueue() {
            return new Queue(DEAD_QUEUE);
        }
    
    
        // 绑定死信交换机和死信队列
        @Bean
        public Binding deadBinding() {
            return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("my.dead.letter");
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    运行项目,会自动生成2个Exchange和2个Queue, 如下图所示:
    Spring-RabbitMQ 死信队列实践
    Spring-RabbitMQ 死信队列实践
    D: durable, 表示持久的。
    DLX: x-dead-letter-exchange = myDeadExchange,表示这个队列关联到的死信交换机。
    DLK: x-dead-letter-routing-key = my.dead.letter,表示这个队列的死信 routingKey, 根据这个routingKey将死信路由到相应的死信队列

    TTL过期,导致死信

    生产者代码:设置太ttl=10s,没有消费者。

    @Component
    public class Publisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private Jackson2JsonMessageConverter jsonConverter;
        
        public void send(){
            CorrelationData correlationData = new CorrelationData();
            MessageProperties messageProperties = new MessageProperties();
            //ttl 10s
            messageProperties.setExpiration("10000");
    
            Message message = jsonConverter.toMessage(new User("Kleven", 18), messageProperties);
            rabbitTemplate.convertAndSend("my.test.message", message, correlationData);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    结果:生产者发布运行10s后,由于没有消费者消息,消息过期进入死信队列。
    Spring-RabbitMQ 死信队列实践

    队列长度达到限制,导致死信

    生成者:发送10条消息,因为配置类通过 “x-max-length” 参数限制了队列长度为5,所以有5条消息会进入死信队列。

    @Component
    public class PublisherForMaxQueueLength {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        public void send(){
            int age = 18;
    
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("my.test.message", new User("kleven", age++), new CorrelationData());
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    结果:前5条信息(age = 18 - 22)进入死信队列。
    Spring-RabbitMQ 死信队列实践

    Lim: x-max-length = 5,代表队列长度为5。

    消费者拒绝或消费失败,导致死信

    生产者:生成5个消息,年龄从16开始。

    @Component
    public class PublisherForReject{
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        public void send(){
            int age = 16;
    
            for (int i = 0; i < 5; i++) {
                rabbitTemplate.convertAndSend("my.test.message", new User("kleven", age++), new CorrelationData());
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    消费者:用户年龄大于18才能被消费。

    @Slf4j
    @Component
    public class Consumer {
        // 正常消费
        @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter", ackMode = "MANUAL")
        public void normalConsumer(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
            int age = user.getAge();
            if (age >= 18) {
                log.info("正常消费者消费 -> {}", user);
                channel.basicAck(deliveryTag, false);
            } else {
                channel.basicNack(deliveryTag, false, false);
            }
        }
    
        // 死信消费
        @RabbitListener(queues = "myDeadQueue", messageConverter = "jsonConverter")
        public void deadConsumer(User user) {
            log.info("死信消费者消费 -> {}", user);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    结果:年龄18,19,20被正常消费,16,17被当做死信处理。
    Spring-RabbitMQ 死信队列实践

  • 相关阅读:
    Win11怎么修改关机界面颜色?Win11修改关机界面颜色的方法
    这是我见过史上最强的spring全家桶笔记,在GitHub上两天破百万瞬间爆火
    LeetCode 697. Degree of an Array
    java数据结构与算法刷题-----LeetCode58:最后一个单词的长度
    QPushButton 样式使用示例(以及按钮setmenu添加下拉菜单的方法)
    Codeforces Global Round 23 E CF1746E Joking (Hard Version)
    现货黄金知识知多少(上)
    使用Jmeter遇到随机取值的需求怎么办?
    图文解释各种电阻阻值的读取方法【硬件篇】
    scratch学习相关资料汇总
  • 原文地址:https://blog.csdn.net/u012359704/article/details/126428118