• RabbitMQ高级特性 - 消息和队列TTL、死信队列


    消息和队列TTL


    概述

    a)TTL(Time To Live 过期时间),RabbitMQ 可以对消息和队列设置 TTL. 当消息到达存活时间之后,还没有被消费,就会被自动清除.

    b)注意:

    • 消息设置TTL:MQ 扫描到队列头部消息(先进先出)过期了,就会剔除消息(这里会存在一个问题:当队列头部的消息还没过期,而队列头部之后的元素过期了却不会被清理,需要等到头部元素过期被清理后,才会依次扫描后面的元素).
    • 队列设置TTL:相当于是给当前队列中的所有消息设置了一个全局过期时间,消息的 ttl 最终取决于 消息 和 队列 设置的 ttl 中的最小值.(队列设置 ttl 并非到期就删除队列,而是作用于其中的消息)
    • 消息和队列都设置TTL:只要队列过期,那么该队列中的所有消息也都会被清理. 假设队列 TTL 是 20s,消息 TTL 是 10s,那么消息的 TTL 会自动取最小值,也就是 10s.

    c)消息过期了,但由于排在此消息前面的消息没有过期,,导致过期的消息不能被清理,可能会造成什么问题?
    例如想通过 消息TTL + 死信队列 来实现 延迟队列 的效果,就可能存在漏洞:
    当前消息过期了,但是前面的消息没有过期,于是此消息依然在队列中,进而导致消息不能在过期的时候被死信队列接收到,那么进一步导致消息不能被消费者处理(如果是实现订单过期删除业务,就要出大问题了 -> 锁库存 -> 商品即使有,也卖不出去).

    实战开发

    案例:给消息和队列都设置过期时间,队列的过期时间比消息时间短.

    a)配置 交换机、队列、绑定

        @Bean
        fun ttlExchange(): DirectExchange = ExchangeBuilder
            .directExchange(MQConst.TTL_EXCHANGE)
            .build()
    
        /**
         * 消息设置过期时间有很多中方式,本质都是给扩展参数中添加 x-message-ttl
         */
        @Bean
        fun ttlQueue(): Queue = QueueBuilder
            .durable(MQConst.TTL_QUEUE)
            //.withArgument("x-message-ttl", 10 * 1000) //设置过期时间本质都是它
            .ttl(10 * 1000) // 10s 过期
            .build()
        @Bean
        fun ttlBinding(
            @Qualifier("ttlExchange") exchange: DirectExchange,
            @Qualifier("ttlQueue") queue: Queue,
        ): Binding = BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.TTL_BINDING)
    

    b)生产者接口

    import com.cyk.rabbitmq.constants.MQConst
    import org.springframework.amqp.rabbit.core.RabbitTemplate
    import org.springframework.web.bind.annotation.RequestMapping
    import org.springframework.web.bind.annotation.RestController
    
    @RestController
    @RequestMapping("/mq")
    class MQApi(
        private val rabbitTemplate: RabbitTemplate
    ) {
    
        @RequestMapping("ttl")
        fun ttl(): String {
            rabbitTemplate.convertAndSend(MQConst.TTL_EXCHANGE, MQConst.TTL_BINDING, "ttl msg 1") { msg ->
                //给 msg 配置一些属性
                val expire = 20 * 1000
                msg.messageProperties.expiration = expire.toString()
                return@convertAndSend msg
            }
            return "ok"
        }
    
    }
    

    c)效果演示
    SpringBoot 程序一启动,就可以看到交换机队列被创建,并且队列是带 ttl 的
    在这里插入图片描述
    此时向队列中发送一个过期时间是 20s 的消息,由于队列的过期时间是 10s,因此入队后,消息的过期时间变为 10s.

    死信队列


    概述

    在这里插入图片描述
    a)概念
    死信(dead message):可以简单理解为因为种种原因,无法被消费的信息,就是死信.
    死信交换机(DLX -> Dead Letter Exchange):如果普通队列设置了对应的死信交换机,那么队列中的生成的死信就会被路由到死信交换机.
    死信队列(DLQ -> Dead Letter Queue):死信交换机会根据对应的普通队列设置的 routingKey ,将死信转发给死信队列,最后交给订阅的消费者消费.

    b)消息变成死信的情况:

    1. 消息过期(消息TTL / 队列TTL) .
    2. 消息被拒绝(Basic.Reject / Basic.Nack),并且设置 requeue 参数为 false.
    3. 队列设置最大长度(maxLength).

    c)死信队列解决消息积压问题:
    消费者再处理消息时,可能会因为以下原因,异常处理消息:

    • 网络短暂故障:通过 nack + requeue=true 可以实现消息重新入队,解决网路短暂故障.
    • 代码逻辑错误:如果还是 nack + requeue=true,就会导致消息不断重复,不仅导致负载飙升,还是导致消息积压. 那么这里就可以使用 nack + requeue=false + 死信队列 的方式,保证了消息可以被正确处理.

    d)应用场景:
    例如用户下订单,发送订单到普通队列,超过一定时间用户没有支付,消息就会变成死信. 此时就可以用一个消费者来处理死信,修改订单状态为取消.

    e)注意事项(和之前 消息 TTL 同理):
    例如想通过 消息TTL + 死信队列 来实现 延迟队列 的效果,就可能存在漏洞:
    当前消息过期了,但是前面的消息没有过期,于是此消息依然在队列中,进而导致消息不能在过期的时候被死信队列接收到,那么进一步导致消息不能被消费者处理(如果是实现订单过期删除业务,就要出大问题了 -> 锁库存 -> 商品即使有,也卖不出去).

    实战开发

    a)分别指定 普通 和 死信 交换机、队列、绑定

    @Configuration
    class DLConfig {
    
        @Bean
        fun normalExchange(): DirectExchange = ExchangeBuilder
            .directExchange(MQConst.NORMAL_EXCHANGE)
            .build()
        @Bean
        fun normalQueue(): Queue = QueueBuilder
            .durable(MQConst.NORMAL_QUEUE)
            .deadLetterExchange(MQConst.DL_EXCHANGE)
            .deadLetterRoutingKey(MQConst.DL_BINDING)
            .ttl(10 * 1000)
            //.maxLength(10L)  //队列设置最大长度
            .build()
        @Bean
        fun normalBinding(
            @Qualifier("normalExchange") exchange: Exchange,
            @Qualifier("normalQueue") queue: Queue,
        ): Binding = BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.NORMAL_BINDING)
            .noargs() //如果 交换机 是顶级接口,这里需要 noargs()
    
        @Bean
        fun dlExchange(): DirectExchange = ExchangeBuilder
            .directExchange(MQConst.DL_EXCHANGE)
            .build()
        @Bean
        fun dlQueue(): Queue = QueueBuilder
            .durable(MQConst.DL_QUEUE)
            .build()
        @Bean
        fun dlBinding(): Binding = BindingBuilder
            .bind(dlQueue())
            .to(dlExchange())
            .with(MQConst.DL_BINDING)
    
    }
    

    b)生产者接口

        @RequestMapping("dl")
        fun dl(): String {
            rabbitTemplate.convertAndSend(MQConst.NORMAL_EXCHANGE, MQConst.NORMAL_BINDING, "dl msg 1")
            return "ok"
        }
    

    c)演示效果
    触发生产者接口,可以看到前 10s,如下
    在这里插入图片描述
    消息过期之后,可以看到消息变为死信,最终路由到死信队列,如下:
    在这里插入图片描述

  • 相关阅读:
    面向OLAP的列式存储DBMS-6-[ClickHouse]的常用DDL操作
    SpringCloud Alibaba--nacos配置中心
    DevOps CI/CD之一: Jenkins和Github
    大学生想做兼职应该怎么找,适合大学生的线上线下靠谱兼职推荐
    Zookeeper(二)—集群
    Nginx搭载负载均衡及前端项目部署
    一文讲清 c++ 之队列
    Spring 事务编程实践
    5个免费商用音频素材网站
    #机器学习--补充数学基础--概率论
  • 原文地址:https://blog.csdn.net/CYK_byte/article/details/141091818