• 【学习笔记】RabbitMQ03:DLX 死信交换机 死信队列 以及示例代码


    参考资料

    五、TTL过期消息

    ttl:time to live ,顾名思义,就是过期消息,在指定时间内没有被接受的消息,就会过期。成为过期消息,或死信。

    5.1 设置单条消息的过期时间

    单条消息的过期时间只决定了 没有任何消费者消费时,消息可以存活多久

    5.1.1 具体写法

    具体操作就是使用Message对象中的Properties去设置过期时间。如下

    @GetMapping("/{msg}")
    public void send(@PathVariable String msg){
        MessageProperties messageProperties = new MessageProperties();
        // 设置过期时间:单位:毫秒
        messageProperties.setExpiration("15000");
        Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(messageProperties).build();
        rabbitTemplate.convertAndSend(DirectExchangeConfig.exchangeName , "info" , message);
        log.info("(ttl)发送消息 :{} , 过期时间 :{}" ,msg , LocalDateTimeUtil.of(System.currentTimeMillis()+15000));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    发送到之前的直连交换器

    5.1.2 测试

    访问路径/ttl/一条会过期的消息到控制台查看

    image-20231016150707075

    过一段时间再查看就无了。

    5.2 通过队列属性,设置消息过期时间

    队列的过期时间决定了 在没有任何消费者的情况下,队列中的消息可以存活多久。

    注意事项

    • 如果消息和对列都设置过期时间,则消息的 TTL 以两者之间较小的那个数值为准
    5.2.1 具体写法
    • 需要在配置队列时,就填入过期时间

    • 配置的方式其中一个以map的方式插入,这个配置方式可以配置很多不同的选项。

    • 在控制台的这个位置可以查询

      image-20231016151600482

    • 以过期时间为例

      sp20231016_153105_172

    两种配置方法

    @Bean
    // 方式1:使用map将参数传入
    public Queue queueTTLA(){
        Map<String , Object> map = new HashMap<>();
        map.put("x-message-ttl",15000);
        return QueueBuilder.durable("xcong.queue.ttl.A").withArguments(map).build();
    }
    @Bean
    // 方式2:利用QueueBuilder
    public Queue queueTTLB(){
        return QueueBuilder.durable("xcong.queue.ttl.B").ttl(15000).build();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    设置一个接口用于测试
    @RestController
    @Slf4j
    @RequestMapping("/ttl")
    public class TTLController {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
    
        @GetMapping("/q/{msg}")
        public void sendQ(@PathVariable String msg){
            rabbitTemplate.convertAndSend("xcong.fanout","",msg.getBytes(StandardCharsets.UTF_8));
            log.info("(TTL队列)成功发送消息 {} " ,msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    5.2.2 测试用例

    访问接口/ttl/q/一条会过期的消息

    image-20231016153041967

    对比正常的队列,可以发现,ttl队列的消息会自动过期

    image-20231016153515095


    六、RabbitMQ DLX (死信交换机 )

    6.1 概述

    即DLX(Dead-Letter-Exchange)。也称为:死信交换机、死信邮箱。

    如下情况后,消息会进入死信交换机中。并进一步被安排到死信队列里,消费者也可以从死信队列中获取消息。

    image-20231016154033403

    6.2 死信队列的应用场景

    很常见的买票下订单场景。比如一个用户下了订单买票,需要在30分钟内完成支付。

    如果超过30分钟没有完成,就会发送消息(比如短信)通知用户,并修改订单状态(为未支付),还需要是否存票让别人购买(修改库存信息)。

    而这后续的一系列操作,就可以设置一个消费者去监听死信队列来专门完成。

    6.3 死信队列示例:队列过期

    6.3.1 思路整理
    1. 根据流程图,我们需要设置2个交换机,一个为带过期时间的正常的交换机(TTL),另一个则作为死信交换机(DLX。

    2. 注意:死信交换机也只是一个普通的交换,用法和命名是一样的。

    3. 为了便于测试,需要分别声明两个不同的队列,和上面的交换机分别绑定。至此前置设置完毕。

    4. 核心关键是:如何让正常队列的信息过期后进入到死信队列。使用到的参数如下:

      image-20231016163503518

      image-20231016163534568

      image-20231016163554498

      • 可选的死信交换器名称,如果消息被拒绝或过期,将重新发布到该交换器。(目前只涉及到消息过期,后面还有涉及到拒绝)
    • 可选的死信路由键,当消息是死信时使用。如果没有设置,则将使用消息的原始路由密钥。
    6.3.2 配置示例
    @Configuration
    public class DlxExchangeConfig {
    
        public static String ttlXName = "xcong.dlx.ttl";
        public static String ttlKey = "normal";
        public static String dlxXName = "xcong.dlx.dlx";
        public static String dlxKey = "dead";
    
        @Bean
        public DirectExchange ttlExchange(){
            return ExchangeBuilder.directExchange(ttlXName).build();
        }
    
        @Bean DirectExchange dlxExchange(){
            return ExchangeBuilder.directExchange(dlxXName).build();
        }
    
        
        @Bean
        // 配置普通的过期队列
        public Queue queueTTL(){
            // 设置一个15秒过期的队列
            // 并且要求指定dlx交换机和dlx的key
            Map<String , Object> config = new HashMap<>();
            config.put("x-message-ttl" , 15000);
            config.put("x-dead-letter-exchange" , dlxXName);
            config.put("x-dead-letter-routing-key" , dlxKey);
            return QueueBuilder.durable("queue.dlx.ttl").withArguments(config).build();
        }
    
        @Bean
        public Binding bindingTTL(){
            return BindingBuilder.bind(queueTTL()).to(ttlExchange()).with(ttlKey);
        }
    
        @Bean
        // 配置死信队列
        public Queue queueDLX(){
            return QueueBuilder.durable("queue.dlx.dead").build();
        }
    
        @Bean
        public Binding bindingDLX(){
            return BindingBuilder.bind(queueDLX()).to(dlxExchange()).with(dlxKey);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    接口代码

    @RestController
    @RequestMapping("/dlx")
    @Slf4j
    public class DLXController {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/{msg}")
        public void sentErrorMsg(@PathVariable("msg") String msg ){
            log.info("准备发送的信息:{} , 路由键 :{}",msg , ttlKey);
            // 发送到普通的延时列表中
            rabbitTemplate.convertAndSend(ttlXName , ttlKey , msg.getBytes(StandardCharsets.UTF_8));
            log.info("成功发送!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    6.3.3 测试

    发送消息,查看后台

    image-20231016165900787

    image-20231016165930223

    查看死信队列

    image-20231016170128556

    6.4 死信队列示例:消息过期

    6.5 死信队列示例:超过队列最大长度

    当队列的信息达到最大长度时,先入队的消息会被发送到DLX。

    6.5.1 代码示例
    • 大致配置和上面的案例类似

    • 只需要将tll队列设置最大队列长度即可(不设置过期时间

      • 队列在开始从其头部丢弃消息之前可以包含多少条(就绪)消息。

    示例思路

    1. 给最大长度为 5的普通队列,发送8条消息
    2. 查看队列的情况。

    结果:消息123进入死信(先进先出),45678依然在put队列中

    6.6 死信队列示例:消费者拒绝消息,不进行重新投递

    从正常的队列接受消息,但是对消息不进行确认,并且不对消息进行重新投递。

    此时消息就会进入死信队列。

    这里设计到一个新技术点:消费者手动确认消息

    6.6.1 消费者:开启手动确认消息

    核心配置

    在yml中添加如下,开启手动确认

    spring:
      rabbitmq:
        listener:
          simple:
            # 手动acks-用户必须通过通道感知侦听器进行ack/nack。
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    6.6.2 消费者:如何手动确认

    接收消息的代码就需要利用Channel(信道)。核心的方法如下:

    • 确认 channel.basicAck(long deliveryTag, boolean multiple)
      • deliveryTag:这次信息的唯一标识(从MessageProperties获取)
      • multiple:true——确认所有信息,包括提供的交付标签;false——只确认提供的交货标签
    • 拒绝channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
      • requeue : true——如果被拒绝的邮件应该重新排队,而不是用信件丢弃
        @RabbitListener(queues = {"xcong.fanout.A", "xcong.fanout.B", "xcong.direct.C", "xcong.direct.D"})
        public void reviverMsg(Message message, Channel channel) {
            byte[] body = message.getBody();
            String result = new String(body);
            MessageProperties messageProperties = message.getMessageProperties();
            // 获取消息传递的唯一标签
            long deliveryTag = messageProperties.getDeliveryTag();
            log.info("接收到的消息:{}", result);
           
            try {
                 // 进行确认
                channel.basicAck(deliveryTag , false);
            } catch (Exception e) {
                try {
                    // 进行拒绝
                    channel.basicNack(deliveryTag , false ,true);
                    log.error("遇到异常,拒绝消息:{}" ,e);
                } catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
             
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    6.6.3 示例:消费者确认消息后,正常重新入队
    6.6.3.1 生产者代码
    // 略,沿用之前的直连交换机进行测试
    
    • 1

    测试前进入控制台确保消息都已经被消费

    image-20231017091303361

    6.6.3.2 实例思路
    1. 设置一个转换异常:尝试将接收到的消息转为int
    2. 当捕捉到转换异常时,拒绝消息
    3. 查看异常情况。

    预计情况,消息会不断的返回队列并再次接受(记得打断点

    6.6.3.4 ⭐️消费者代码
    @Service
    @Slf4j
    public class ConsumerService {
    
        @RabbitListener(queues = {"xcong.direct.C", "xcong.direct.D"})
        public void reviverMsg(Message message, Channel channel) {
            byte[] body = message.getBody();
            String result = new String(body);
            MessageProperties messageProperties = message.getMessageProperties();
            // 获取消费者队列名称
            String consumerQueue = messageProperties.getConsumerQueue();
            // 获取接受者交换机名称
            String receivedExchange = messageProperties.getReceivedExchange();
            // 获取消息传递的唯一标签
            long deliveryTag = messageProperties.getDeliveryTag();
            log.info("接收到的消息:{}", result);
            log.info("消息队列 :{} , 交换机名称:{}", consumerQueue, receivedExchange);
            log.info("唯一标识:{}", deliveryTag);
    
            try {
                // 模拟异常
                Integer.parseInt(result);
                // 进行手动确认,并关闭批量确认
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                try {
                    // 进行手动拒绝,并关闭批量拒绝,同时返回队列
                    channel.basicNack(deliveryTag, false, true);
                    log.error("遇到异常,异常信息:{}", e.getLocalizedMessage());
                } catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    6.6.3.5 测试结果
    1. 接收到消息

      image-20231017092457318

    2. 模拟异常情况:进入basicNack方法

    3. 此时,进入rabbit控制台可以看到队列情况

      image-20231017092643063

    4. 如果正常的情况:进入basicAck方法。查看控制台即可发现

      image-20231017092927929

    6.6.4 消费者:拒绝(reject)消息,进入死信队列

    前面我们使用basicNack方法来“拒绝”消息,还有一种拒绝方法叫reject

    • nack这个单词的准确含义为“不做应答”。
    6.6.4.1 nack和reject的区别

    在 RabbitMQ 中,nack 和 reject 消息都可以用于拒绝消息的处理。它们的不同之处在于:

    • 当消费者使用 basic.reject 拒绝消息时,消息会被立即丢弃,不会被重新排队。这意味着该消息将永远不会被消费者接收到。
    • 当消费者使用 basic.nack 拒绝消息时,消息可以被重新排队或者被丢弃。basic.nack 可以接受三个参数:requeuemultipledelivery_tag
      • requeue 参数控制着消息是否应该重新排队
      • multiple 参数控制着是否确认多个消息
      • delivery_tag 参数则指定了要拒绝的消息。

    总的来说,如果你希望消息能够重新排队并稍后重新处理,那么应该使用 basic.nack。如果你希望消息被永久地丢弃,那么应该使用 basic.reject

    6.6.4.2 ⭐️代码实例:拒绝消息进入死信队列

    以消费前面案例中的queue.dlx.ttl队列中信息为例

    image-20231017113639007

    @RabbitListener(queues = "queue.dlx.ttl")
    public void rejectMsg(Message message, Channel channel) {
        String str = new String(message.getBody());
        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();
        log.info("接到准备拒绝的消息 : {} , tag :{} 。。。。5秒后拒绝", str , deliveryTag);
        try {
            ThreadUtil.safeSleep(5000L);
            channel.basicReject(deliveryTag, false);
            log.info("拒绝消息:{}", str);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    
    @RabbitListener(queues = "queue.dlx.dead")
    public void receiveDeadMsg(Message message , Channel channel) throws IOException {
        String str = new String(message.getBody());
        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();
        log.info("接收到的死信 : {} , tag :{} ", str , deliveryTag);
        channel.basicAck(deliveryTag , true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    6.6.4.3 运行结果

    访问端口/dlx/一条新的信息:2023年10月17日11:42:13,发送一条信息。

    1. 接受消息,但是还没确认

      image-20231017114408030

    2. 拒绝消息,死信未确认

      image-20231017114516495

    3. 死信消费者ack后

      image-20231017114607329

    控制台日志打印如下

    image-20231017114628499

    image-20231017114640808

    6.7 死信队列小结

    1. 死信队列和普通的队列一样,可以在通过配置后来接收死信。
    2. 死信出现的几个原因:
      • 消息过期
      • 超过队列最大长度(先进先出
      • 消费者拒绝消息(reject)

  • 相关阅读:
    Qt源码解析-源码解析-QVideoWidget播放手机视频旋转问题
    技术干货 | 一文弄懂差分隐私原理!
    如何获取k8s容器里运行的jar包
    ES6之Set集合(通俗易懂,含实践)
    记录一次用宝塔部署微信小程序Node.js后端接口代码的详细过程
    这篇文章告诉你视频转音频软件哪个好用,有需自取
    qt_vs_tools 设置
    关于I/O——内存与CPU与磁盘之间的关系
    104.(前端)分类管理增加优化——elementui按钮禁用、清除数据后同时清空查询的内容并处理数据不完整报错
    Elastic 发布 Elasticsearch Relevance Engine™ — 为 AI 革命提供高级搜索能力
  • 原文地址:https://blog.csdn.net/Xcong_Zhu/article/details/133886732