• RabbitMQ(八)【高级 - 过期时间 TTL】


    八、RabbitMQ高级 - 过期时间 TTL


    上一篇文章SpringBoot案例

    概述

    过期时间 TTL 表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对 消息和队列 设置 TTL,目前有两种方法可以设置

    • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
    • 第二种方法是对消息进行单独设置,每条消息 TTL 可以不同

    如果上述两种方式同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息队列的生存时间一旦超过设置的 TLL 值,就称为 dead message 被投递到死信队列,消费者将无法再收到该消息

    8.1 队列属性设置过期时间

    队列过期时间 - 代码测试

    1. springboot-order-rabbitmq-producer工程下的config包下,新建TTLRabbitMQConfiguration.java
    package com.vinjcent.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class TTLRabbitMQConfiguration {
    
    
        // 1.声明注册direct模式交换机
        @Bean
        public DirectExchange ttl_directExchange(){
            return new DirectExchange("ttl_order_exchange", true, false);
        }
    
    
        // 2.声明队列
        @Bean
        public Queue ttl_accountQueue(){
            // 设置过期时间
            Map<String, Object> args = new HashMap<>();
            // 根据参数说明设置对应参数
            args.put("x-message-ttl",5000);
            return new Queue("account.ttl.queue", true, false, false, args);
        } 
        // 3.完成绑定关系(队列)
        @Bean
        public Binding ttl_accountBinding(){
            return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    1. service包下的OrderServiceImpl.java新增 ttl 函数
    package com.vinjcent.rabbitmq.service;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @SuppressWarnings("all")
    @Service
    public class OrderServiceImpl {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // fanout模式
        public void createOrderFanout(String userId, String productId, int num){
            //......
        }
    
        // direct模式
        public void createOrderDirect(String userId, String productId, int num){
            //......
        }
    
        // topic
        public void createOrderTopic(String userId, String productId, int num){
            //......
        }
    
        // ttl
        public void createOrderTtl(String userId, String productId, int num){
            // 1.根据商品id查询库存是否充足
            // 2.保存订单
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单生产成功: " + orderId);
            // 3.通过MQ来完成消息的分发
            // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
            String exchangeName = "ttl_order_exchange";
            // 路由给三个消息队列推送消息
            String routingKey = "ttl";
    
            /*
             *  #.account.#
             *  *.express.#
             *  sms.#
             */
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    在这里插入图片描述

    1. 测试用例
    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
    
        @Autowired
        OrderServiceImpl orderService;
    
        @Test
        void contextLoads() {
            orderService.createOrderFanout("1","1",12);
        }
    
        @Test
        void testDirect() {
            orderService.createOrderDirect("1","1",12);
        }
    
        @Test
        void testTopic() {
            orderService.createOrderTopic("1","1",12);
        }
    
        // 选择该测试用例
        @Test
        void testTtl() {
            orderService.createOrderTtl("1","1",12);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    1. 查看web界面中的Queues队列

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    8.2 队列里的消息设置过期时间

    队列消息过期时间 - 代码测试

    1. springboot-order-rabbitmq-producer工程下的config包下,在TTLRabbitMQConfiguration.java类添加以下内容
    package com.vinjcent.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class TTLRabbitMQConfiguration {
    
    
        // 1.声明注册fanout模式交换机
        @Bean
        public DirectExchange ttl_directExchange(){
            return new DirectExchange("ttl_order_exchange", true, false);
        }
    
    
        // 2.声明队列
        @Bean
        public Queue ttl_accountQueue(){
            // 设置过期时间
            Map<String, Object> args = new HashMap<>();
            // 根据参数说明设置对应参数
            args.put("x-message-ttl",5000);
            return new Queue("account.ttl.queue", true, false, false, args);
        }
    
        @Bean
        public Queue ttlMessage_accountQueue(){
            return new Queue("account.ttl.message.queue", true, false, false);
        }
    
        @Bean
        public Binding ttl_accountBinding(){
    		// ...
        }
    
    	// 绑定操作
        @Bean
        public Binding ttlMessage_accountBinding(){
            return BindingBuilder.bind(ttlMessage_accountQueue()).to(ttl_directExchange()).with("ttlmessage");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    1. service包下的OrderServiceImpl.java新增 ttlMessage 函数
    package com.vinjcent.rabbitmq.service;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @SuppressWarnings("all")
    @Service
    public class OrderServiceImpl {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // fanout模式
        public void createOrderFanout(String userId, String productId, int num){
            //......
        }
    
        // direct模式
        public void createOrderDirect(String userId, String productId, int num){
            //......
        }
    
        // topic
        public void createOrderTopic(String userId, String productId, int num){
            //......
        }
    
        // ttl
        public void createOrderTtl(String userId, String productId, int num){
            //......
        }
    
        // ttlmessage
        public void createOrderTtlMessage(String userId, String productId, int num){
            // 1.根据商品id查询库存是否充足
            // 2.保存订单
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单生产成功: " + orderId);
            // 3.通过MQ来完成消息的分发
            // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
            String exchangeName = "ttl_order_exchange";
            // 路由给三个消息队列推送消息
            String routingKey = "ttlmessage";
    
            // 给消息设置过期时间
            MessagePostProcessor postProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    // 参数类型是字符串
                    message.getMessageProperties().setExpiration("5000");
                    message.getMessageProperties().setContentEncoding("UTF-8");
                    return message;
                }
            };
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, postProcessor);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    1. 测试用例
    package com.vinjcent.rabbitmq;
    
    import com.vinjcent.rabbitmq.service.OrderServiceImpl;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
    
        @Autowired
        OrderServiceImpl orderService;
    
        @Test
        void contextLoads() {
            orderService.createOrderFanout("1","1",12);
        }
    
        @Test
        void testDirect() {
            orderService.createOrderDirect("1","1",12);
        }
    
        @Test
        void testTopic() {
            orderService.createOrderTopic("1","1",12);
        }
    
        @Test
        void testTtl() {
            orderService.createOrderTtl("1","1",12);
        }
    
        // 使用该测试用例
        @Test
        void testTtlMessage() {
            orderService.createOrderTtlMessage("1","1",12);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    在这里插入图片描述

    在这里插入图片描述

    两者的区别在于,过期的消息队列,不会立即删除掉队列里的消息,而是将该队列中的消息放在死信队列中;而过期消息的消息队列则会立马删除掉该指定的消息

    当两者同时存在时,谁的有效期时间短,就以该较短的时间作为标准

    8.3 死信队列

    概述

    DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为私信邮箱。当消息在一个队列中变成了死信(Dead Message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列

    消息变成死信,有如下原因

    • 消息被拒绝
    • 消息过期
    • 队列达到最大长度

    DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq 就会自动将这个消息重新发布到设置的 DLX 上去,进而被路由到另外一个队列,即死信队列

    想要使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可

    在这里插入图片描述

    1. springboot-order-rabbitmq-producer工程下的config包下,添加DeadRabbitMQConfiguration.java类(定义死信交换机和死信队列
    package com.vinjcent.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class DeadRabbitMQConfiguration {
    
    
        // 1.声明注册fanout模式交换机
        @Bean
        public DirectExchange deadDirect(){
            return new DirectExchange("ttl_dead_exchange", true, false);
        }
    
        // 2.声明队列
        @Bean
        public Queue deadQueue(){
            return new Queue("direct.dead.queue", true);
        }
    
        // 3.完成绑定关系(队列)
        // 路由
        @Bean
        public Binding deadBinding(){
            return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1. 在该工程下修改service包下的TTLRabbitMQConfiguration.java类(为该队列和交换机绑定死信参数
    
    package com.vinjcent.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class TTLRabbitMQConfiguration {
    
    
        // 1.声明注册fanout模式交换机
        @Bean
        public DirectExchange ttl_directExchange(){
            return new DirectExchange("ttl_order_exchange",true,false);
        }
    
    
        // 2.声明队列
        @Bean
        public Queue ttl_accountQueue(){
            // 设置过期时间
            Map<String, Object> args = new HashMap<>();
            // 根据参数说明设置对应参数
            args.put("x-message-ttl",5000);
            // 配置死信交换机
            args.put("x-dead-letter-exchange","ttl_dead_exchange");
            // 配置死信队列的路由key
            args.put("x-dead-letter-routing-key","dead");   // direct模式需要配置路由key,而fanout模式不需要
            return new Queue("account.ttl.queue",true,false,false,args);
        }
    
        // 3.完成绑定关系(队列)
        // direct模式比fanout模式多了一个路由key
        @Bean
        public Binding ttl_accountBinding(){
            return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
        }
    
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    1. 在该工程下修改service包下的OrderServiceImpl.java类下的createOrderTtl函数
    package com.vinjcent.rabbitmq.service;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @SuppressWarnings("all")
    @Service
    public class OrderServiceImpl {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // fanout模式
        public void createOrderFanout(String userId, String productId, int num){
            //......
        }
    
        // direct模式
        public void createOrderDirect(String userId, String productId, int num){
            //......
        }
    
        // topic
        public void createOrderTopic(String userId, String productId, int num){
            //......
        }
    
        // ttl
        public void createOrderTtl(String userId, String productId, int num){
            // 1.根据商品id查询库存是否充足
            // 2.保存订单
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单生产成功: " + orderId);
            // 3.通过MQ来完成消息的分发
            // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
            String exchangeName = "ttl_order_exchange";
            // 路由给三个消息队列推送消息
            String routingKey = "ttl";
    
            /*
             *  #.account.#
             *  *.express.#
             *  sms.#
             */
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    
        }
    
        // ttlmessage
        public void createOrderTtlMessage(String userId, String productId,int num){
            //......
        }
    
    
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    配置的死信队列参数可在web界面查看

    在这里插入图片描述

    在这里插入图片描述

    1. 运行测试用例
    package com.vinjcent.rabbitmq;
    
    import com.vinjcent.rabbitmq.service.OrderServiceImpl;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
    
        @Autowired
        OrderServiceImpl orderService;
    
        @Test
        void contextLoads() {
            orderService.createOrderFanout("1","1",12);
        }
    
        @Test
        void testDirect() {
            orderService.createOrderDirect("1","1",12);
        }
    
        @Test
        void testTopic() {
            orderService.createOrderTopic("1","1",12);
        }
    
        // 使用该测试用例
        @Test
        void testTtl() {
            orderService.createOrderTtl("1","1",12);
        }
    
        @Test
        void testTtlMessage() {
            orderService.createOrderTtlMessage("1","1",12);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    在这里插入图片描述

    报错原因:由于已创建的队列,在原来的基础上进行代码的修改,这个队列不会被更改或者覆盖,只能通过删除之后重新运行

    在这里插入图片描述

    在实际开发中,并不推荐直接从线上直接删除,这样做的风险比较大。可以通过重新创建一个队列,将要删除的队列的死信队列与之绑定,将死信队列里的信息用生产者发送到新的队列当中,实现转换/迁移的过程

    在这里插入图片描述

    在这里插入图片描述

    对于参数的设置,可以在web界面查看异常队列规则,从而限制队列消息的推入
    在这里插入图片描述

    下一篇文章内存磁盘的监控

  • 相关阅读:
    国标视频平台EasyGBS调用快照接口,未能正常返回快照图片该如何解决?
    LLM探索:环境搭建与模型本地部署
    使用Docker开发GO应用程序
    Python跨语言调用Java服务Rpc接口
    Unity Xlua热更新框架(六):场景管理
    基于单片机体温脉搏检测控制系统及源程序
    【web-代码审计】(14.2)常见漏洞签名
    SQL的delete和drop
    netty系列之: 在netty中使用 tls 协议请求 DNS 服务器
    《opencv学习笔记》-- 线性滤波:方框滤波、均值滤波、高斯滤波
  • 原文地址:https://blog.csdn.net/Wei_Naijia/article/details/126564240