• SpringBoot系统搭建集成-RabbitMq延时队列


    SpringBoot系统搭建集成-RabbitMq延时队列

    一、介绍

    什么是延时队列?

    延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费

    延迟交换机主要帮我们解决什么问题

    ​ (1)当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, name必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;

    ​ (2)就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的

    适用场景

    (1)商城订单超时未支付,取消订单

    (2)使用权限到期前十分钟提醒用户

    (3)收益项目,投入后一段时间后产生收益

    实现方式

    从以上场景中,我们可以看出,延时队列的主要功能就是在指定的时间之后做指定的事情,那么,我们思考有哪些工具我们可以使用?

    • Redis 监听过期 Key
    • RabbitMQ等实现延时队列

    使用 RabbitMQ延时队列有两种方式

    • 利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
    • 利用 RabbitMQ 中的插件 x-delay-message

    下载插件

    RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 官网下载到它

    https://www.rabbitmq.com/community-plugins.html
    
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    
    • 1
    • 2
    • 3

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NRrMMjMY-1686048160057)(typora-user-images/image-20230208095132473.png)]

    选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为

    docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez  rabbitmq1:/plugins
    
    docker exec  rabbitmq1  rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1
    • 2
    • 3

    在SpringBoot整合RabbitMQ

    引入 RabbitMQ 依赖

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    配置 RabbitMQ 信息

    spring:
      rabbitmq:
        host: 127.0.0.1
        #host: 10.106.10.91
        port: 5672
        username: admin
        password: 123456
        virtual-host: pub
        publisher-confirms: true   # 开启发送确认
        publisher-returns: true  # 开启发送失败回退
          #开启ack
        listener:
          direct:
            acknowledge-mode: manual
          simple:
            acknowledge-mode: manual #采取手动应答
            #concurrency: 1 # 指定最小的消费者数量
            #max-concurrency: 1 #指定最大的消费者数量
            retry:
              enabled: true # 是否支持重试
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    RabbitMQ 常量类

    package com.github.cundream.springbootbuilding.common.rabbitmq;
    
    /**
     * @className: com.github.cundream.springbootbuilding.common.rabbitmq-> RabbitConst
     * @description:
     * @author: 李村 
     * @createDate:
     */
    public class RabbitConst {
        /**
         * 交换机
         */
        public static final String DELAY_EXCHANGE = "delay_exchange";
    
        /**
         * 队列
         */
        public static final String DELAY_QUEUE = "delay_queue";
    
        /**
         * 路由
         */
        public static final String DELAY_KEY = "delay_key";
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    RabbitMQ 配置类

    package com.github.cundream.springbootbuilding.common.rabbitmq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @className: com.github.cundream.springbootbuilding.common.rabbitmq-> RabbitConfig
     * @description:
     * @author: 李村 200900681
     * @createDate:
     */
    @Configuration
    @Slf4j
    public class RabbitConfig {
    
    
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
            return rabbitTemplate;
        }
    
        /**
         * 直接模式队列1
         */
        @Bean
        public Queue directOneQueue() {
            return new Queue("cundream");
        }
        /**
         * 延时队列交换机
         *
         * @return
         */
        @Bean
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(RabbitConst.DELAY_EXCHANGE, "x-delayed-message", true, false, args);
        }
    
        /**
         * 延时队列
         *
         * @return
         */
        @Bean
        public Queue delayQueue() {
            return new Queue(RabbitConst.DELAY_QUEUE, true);
        }
    
        /**
         * 给延时队列绑定交换机
         *
         * @return
         */
        @Bean
        public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
            return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConst.DELAY_KEY).noargs();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    RabbitMQ 生产者

    package com.github.cundream.springbootbuilding.service.impl;
    
    import com.github.cundream.springbootbuilding.common.rabbitmq.RabbitConst;
    import com.github.cundream.springbootbuilding.service.RabbitMqService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @className: com.github.cundream.springbootbuilding.service.impl-> RabbitMqServiceImpl
     * @description:
     * @author: 李村 
     * @createDate:
     */
    @Service
    @Slf4j
    public class RabbitMqServiceImpl implements RabbitMqService {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void sendDelayMessage(Object object, long millisecond) {
            this.rabbitTemplate.convertAndSend(
                    RabbitConst.DELAY_EXCHANGE,
                    RabbitConst.DELAY_KEY,
                    object.toString(),
                    message -> {
                        message.getMessageProperties().setHeader("x-delay", millisecond);
                        return message;
                    }
            );
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    RabbitMQ 消费者

    package com.github.cundream.springbootbuilding.common.rabbitmq.consumer;
    
    import cn.hutool.json.JSONUtil;
    import com.github.cundream.springbootbuilding.common.rabbitmq.RabbitConst;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * @className: com.github.cundream.springbootbuilding.common.rabbitmq.consumer-> ReceiveDealyConsumer
     * @description:
     * @author: 李村 
     * @createDate:
     */
    @Slf4j
    @RabbitListener(queuesToDeclare = @Queue(RabbitConst.DELAY_QUEUE))
    @Component
    public class ReceiveDealyHandler {
        @RabbitHandler
        public void directHandlerManualAck(Object object, Message message, Channel channel) {
            //  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
            final long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                log.info("直接队列1,手动ACK,接收消息:{}", object.toString());
                // 通知 MQ 消息已被成功消费,可以ACK了
                channel.basicAck(deliveryTag, false);
            } catch (IOException e) {
                try {
                    // 处理失败,重新压入MQ
                    channel.basicRecover();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    测试

    通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功

        @RequestMapping(value = "/delayMessage",method = RequestMethod.GET)
        public void delayMessage() {
            String message1 = "这是第一条消息";
            String message2 = "这是第二条消息";
            rabbitMqService.sendDelayMessage(message1, 5000);
            rabbitMqService.sendDelayMessage(message2, 10000);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    探讨苹果商店那些“变身包”究竟是怎么上架的
    面试题:Java 序列化和反序列化为什么要实现 Serializable 接口?
    stm32——GPIO学习
    VUE3版本新特性
    [JS] canvas 详解
    常见注释的格式
    java-php-python-ssm新冠病毒感染者档案信息管理系统计算机毕业设计
    基于SkeyeVSS系统实现政府综合性视频监控平台管理解决方案
    Java资深架构师带你深度“吃透”字节跳动的亿级流量+高并发,这还不冲?
    实战项目: 负载均衡
  • 原文地址:https://blog.csdn.net/qq_34599132/article/details/131073877