• RabbitMQ实现延迟队列


    实现方式一:死信队列

    AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
    但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:

    1、Time To Live(TTL)

    RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
    RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
    A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
    B: 对消息进行单独设置,每条消息TTL可以不同。

    2、Dead Letter Exchanges(DLX)

    RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

    x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
    x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

    队列出现dead letter的情况

    1、消息或者队列的TTL过期
    2、队列达到最大长度
    3、消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
    代码实现
    首先加入依赖

    
                org.springframework.amqp
                spring-rabbit-test
                test
            
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
                cn.hutool
                hutool-all
                5.7.16
            
            
                com.alibaba
                fastjson
                1.2.78
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    编写配置文件
    application.yml

    spring:
      application:
        name: delay-queue
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: admin
        password: admin123456
        virtual-host: delays1
    server:
      port: 8082
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    编写队列配置文件

    #订单业务队列的名称,交换机名称、路由key、超时时间
    delay.bussiness.queue: order_time_out_15s
    delay.bussiness.excahnge: order_time_out_15s_exchange
    delay.bussiness.route: order_time_out_15s_776
    
    #死信队列的名称,交换机名称、路由key、超时时间
    delay.dead.queue: dead_order_time_out_15s
    delay.dead.excahnge: dead_order_time_out_15s_exchange
    delay.dead.route: dead_order_time_out_15s_666
    delay.bussiness.order.timeout: 15000
    
    #利用rabbitmq_delayed_message_exchange实现延迟队列的方式
    #插件实现订单业务队列的名称,交换机名称、路由key、超时时间
    delay.plugins.queue: plugin_order_delay_30
    delay.plugins.exchange: plugin_order_exchange_30
    delay.plugins.route.key: plugin_order_route_key_30
    delay.plugin.timeout: 30000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    编写配置读取类

    @Configuration
    @PropertySource("classpath:rabbitmqs.properties")
    @Data
    public class OrderDelayConfig {
    
        @Value("${delay.bussiness.queue}")
        private String orderDelayQueueName;
    
        @Value("${delay.bussiness.excahnge}")
        private String orderDelayExchangeName;
    
        @Value("${delay.bussiness.route}")
        private String orderDelayRouteKey;
    
    
        @Value("${delay.dead.queue}")
        private String orderDeadDelayQueueName;
    
        @Value("${delay.dead.excahnge}")
        private String orderDeadDelayExchangeName;
    
        @Value("${delay.dead.route}")
        private String orderDeadDelayRouteKey;
    
        @Value("${delay.bussiness.order.timeout}")
        private Long timeout;
    
        @Value("${delay.plugins.queue}")
        private String pluginOrderQueueName;
    
        @Value("${delay.plugins.exchange}")
        private String pluginOrderExchangeName;
    
        @Value("${delay.plugins.route.key}")
        private String pluginOrderRouteKey;
    
        @Value("${delay.plugin.timeout}")
        private Long pluginTimeout;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    编写队列、交换机创建、交换机和队列、路由key值绑定的配置类

      //rabbitMq内置死信队列信息
        private final String dlexchange = "x-dead-letter-exchange";
        private final String dlRouteKey = "x-dead-letter-routing-key";
        private final String ttl = "x-message-ttl";
        @Autowired
        private  OrderDelayConfig orderDelayConfig;
    
        //创建死信交换机
        @Bean("orderDeadExchange")
        public DirectExchange deadTopicExchange() {
            return new DirectExchange(orderDelayConfig.getOrderDeadDelayExchangeName());
        }
        //创建业务交换机
        @Bean("orderExchange")
        public DirectExchange payTopicExchange() {
            return new DirectExchange(orderDelayConfig.getOrderDelayExchangeName());
        }
        
        //创建死信队列
        @Bean("orderDeadQueue")
        public Queue deadQueue() {
            return new Queue(orderDelayConfig.getOrderDeadDelayQueueName());
        }
    
        /**
         * 创建订单超时队列
         */
        @Bean("orderQueue")
        public Queue payQueue() {
            Map params = new HashMap<>();
            //设置队列的过期时间
            params.put(ttl,  orderDelayConfig.getTimeout());
            //声明当前队列绑定的死信交换机
            params.put(dlexchange, orderDelayConfig.getOrderDeadDelayExchangeName());
            //声明当前队列的死信路由键
            params.put(dlRouteKey, orderDelayConfig.getOrderDeadDelayRouteKey());
    
            return QueueBuilder.durable(orderDelayConfig.getOrderDelayQueueName()).withArguments(params).build();
    
        }
        
        //订单数据队列绑定交换机和route的key值
        @Bean
        public Binding delayBindingA(@Qualifier("orderQueue") Queue queue,
                                     @Qualifier("orderExchange") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getOrderDelayRouteKey());
        }
    
    
        //死信队列与死信交换机进行绑定
        @Bean
        public Binding BindingErrorQueueAndExchange(@Qualifier("orderDeadQueue") Queue deadQueue,
                                                    @Qualifier("orderDeadExchange") DirectExchange exchange) {
            return BindingBuilder.bind(deadQueue).to(exchange).with(orderDelayConfig.getOrderDeadDelayRouteKey());
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    以上已经完成了配置工作,下面需要完成业务代码实现
    1、新建订单实体类

    @Data
    public class Order {
    
        /**
         * 订单编号
         */
        private String orderNo;
        /**
         * 价格(元)
         */
        private BigDecimal price;
        /**
         * 商品数量
         */
        private int prodductNum;
        /**
         * 总金额
         */
        private BigDecimal totalAmount;
    
        /**
         * 创建时间
         */
        private Date createTime;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    定义消息发送和消息消费

    @Component
    @EnableScheduling
    @Slf4j
    public class OrderDelayQueue {
    
        private RabbitTemplate rabbitTemplate;
        private OrderDelayConfig orderDelayConfig;
        private final static String orderQueueName = "dead_order_time_out_15s";
    
    
        public OrderDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) {
            this.rabbitTemplate = rabbitTemplate;
            this.orderDelayConfig = orderDelayConfig;
        }
    
    
        @Scheduled(cron = "0/30 * * * * ?")
        public void sendOrderMsg() {
            Order order;
            for (int i = 0; i < 3; i++) {
                // Thread.sleep(1000);
                order = new Order();
                order.setOrderNo(new Snowflake().nextIdStr());
                order.setCreateTime(new Date());
                rabbitSendMsg(JSON.toJSONString(order));
            }
    
    
        }
    
        /**
         * 发送消息
         *
         * @param msg
         */
        public void rabbitSendMsg(String msg) {
            rabbitTemplate.convertAndSend(orderDelayConfig.getOrderDelayExchangeName(),
                    orderDelayConfig.getOrderDelayRouteKey(), msg);
        }
    
        //消费死信队列的消息
        @RabbitListener(queues = orderQueueName)
        public void infoConsumption(String data) throws Exception {
            //此处编写执行订单超时状态的逻辑代码
            final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
            final Order order = JSONObject.parseObject(data, Order.class);
            final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000;
            log.info(order.getOrderNo() + "死信队列========:订单已经超时了" + "失效时间" + diff + "秒");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    到此完成全部代码,启动项目执行即可完成死信队列实现延迟队列功能。

    方式2:利用rabbitmq_delayed_message_exchange实现延迟队列

    安装插件

    1、下载延时消息插件:https://www.rabbitmq.com/community-plugins.html
    在这里插入图片描述
    2、将下载的文件放在rabbitmq的安装路径plugins文件中

    在这里插入图片描述
    安装插件
    打开rabbitmq的命令界面
    执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    在这里插入图片描述
    入上图所示表示安装成功

    代码实现

    配置信息,在第一种方式中已配置

    1、队列、交换机、路由key值创建,绑定

     //插件实现创建业务交换机
        @Bean("pluginOrderExchange")
        public CustomExchange pluginExchange() {
            Map args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(orderDelayConfig.getPluginOrderExchangeName(),
                    "x-delayed-message", true, false, args);
        }
    
    
        /**
         * 插件实现创建订单超时队列
         */
        @Bean("pluginOrderQueue")
        public Queue pluginQueue() {
            return new Queue(orderDelayConfig.getPluginOrderQueueName());
    
        }
    
        /**
         * 插件实现绑定交换机,和指定的key值
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding delayBindingPlugin(@Qualifier("pluginOrderQueue") Queue queue,
                                          @Qualifier("pluginOrderExchange") CustomExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getPluginOrderRouteKey()).noargs();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2、消息发送,消息消费

    @Component
    @EnableScheduling
    @Slf4j
    public class OrderPluginDelayQueue {
    
        private RabbitTemplate rabbitTemplate;
        private OrderDelayConfig orderDelayConfig;
        private final static String orderQueueName = "plugin_order_delay_30";
        public OrderPluginDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) {
            this.rabbitTemplate = rabbitTemplate;
            this.orderDelayConfig = orderDelayConfig;
        }
    
    
        @Scheduled(cron = "0/30 * * * * ?")
        public void sendOrderMsg() {
            Order order;
            for (int i = 0; i < 3; i++) {
                // Thread.sleep(1000);
                order = new Order();
                order.setOrderNo(new Snowflake().nextIdStr());
                order.setCreateTime(new Date());
                rabbitSendMsg(JSON.toJSONString(order),orderDelayConfig.getPluginTimeout().intValue());
            }
    
    
        }
    
        /**
         * 发送消息
         *
         * @param msg
         */
        public void rabbitSendMsg(String msg,int delayTime) {
            rabbitTemplate.convertAndSend(orderDelayConfig.getPluginOrderExchangeName(),
                    orderDelayConfig.getPluginOrderRouteKey(), msg,s->{
                    s.getMessageProperties().setDelay(delayTime);
                    return s;
                    });
        }
    
        //消费死信队列的消息
        @RabbitListener(queues = orderQueueName)
        public void infoConsumption(String data) throws Exception {
            //此处编写执行订单超时状态的逻辑代码
            final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
            final Order order = JSONObject.parseObject(data, Order.class);
            final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000;
            log.info(order.getOrderNo() + "插件实现=============订单已经超时了" + "失效时间" + diff + "秒");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    以上完成第二种方式代码
    实现结果截图
    在这里插入图片描述

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    golang gin 监听rabbitmq队列无限消费
    7.6 分组的背包问题7.7 有依赖的背包问题7.8 泛化物品
    69.Qt 实现Https ssl证书与内网服务器ip进行双向认证
    容器 - 八大架构的演进
    Python Day6列表进阶【零基础】
    《idea 骚操作》
    秋招每日一题T14——将矩阵按对角线排序
    蓝桥杯打卡Day9
    C++精通之路:红黑树的应用(模拟实现map/set)
    异构计算技术分析
  • 原文地址:https://blog.csdn.net/web15085599741/article/details/126113967