• 基于Rabbitmq和Redis的延迟消息实现


    1 基于Rabbitmq延迟消息实现

    支付时间设置为30,未支付的消息会积压在mq中,给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理
    在这里插入图片描述
    在这里插入图片描述

    1.1定义延迟消息实体

    由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体

    @Data
    public class MultiDelayMessage<T> {
        /**
         * 消息体
         */
        private T data;
        /**
         * 记录延迟时间的集合
         */
        private List<Long> delayMillis;
    
        public MultiDelayMessage(T data, List<Long> delayMillis) {
            this.data = data;
            this.delayMillis = delayMillis;
        }
        public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
            return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
        }
    
        /**
         * 获取并移除下一个延迟时间
         * @return 队列中的第一个延迟时间
         */
        public Long removeNextDelay(){
            return delayMillis.remove(0);
        }
    
        /**
         * 是否还有下一个延迟时间
         */
        public boolean hasNextDelay(){
            return !delayMillis.isEmpty();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    1.2 定义常量,用于记录交换机、队列、RoutingKey等常量

    package com.hmall.trade.constants;
    
    public interface MqConstants {
        String DELAY_EXCHANGE = "trade.delay.topic";
        String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
        String DELAY_ORDER_ROUTING_KEY = "order.query";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.3 抽取mq配置到nacos中

    spring:
      rabbitmq:
        host: ${hm.mq.host:192.168.150.101} # 主机名
        port: ${hm.mq.port:5672} # 端口
        virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
        username: ${hm.mq.un:hmall} # 用户名
        password: ${hm.mq.pw:123} # 密码
        listener:
          simple:
            prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.4 定义消息处理器

    使用延迟消息处理器发送消息
    在这里插入图片描述
    在这里插入图片描述

    1.5 消息监听与延迟消息再次发送

    在这里插入图片描述
    在这里插入图片描述

    2 延迟消息实现

    DelayQueue:基于JVM,保存在内存中,会出现消息丢失
    在这里插入图片描述

    Rabbitmq的延迟任务:基于TTL和死信交换机
    在这里插入图片描述

    2.1 redis的延迟任务:基于zset的去重和排序功能

    在这里插入图片描述
    1.为什么任务需要存储在数据库中?
    延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

    2.为什么使用redis中的两种数据类型,list和zset?

    • 原因一: list存储立即执行的任务,zset存储未来的数据
    • 原因二:任务量过大以后,zset的性能会下降

    时间复杂度:执行时间(次数) 随着数据规模增长的变化趋势

    • 操作redis中的list命令LPUSH: 时间复杂度: O(1)
    • 操作redis中的zset命令zadd: 时间复杂度: (Mlog(n))

    2.2 设计mybatis映射实体类:

    /**
         * 版本号,用乐观锁
         */
        @Version
        private Integer version;
    
    乐观锁支持:
    /**
         * mybatis-plus乐观锁支持
         * @return
         */
    @Bean
    public MybatisPlusInterceptor optimisticLockerInterceptor(){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
        return interceptor;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2.3 创建task类,用于接收添加任务的参数

    @Data
    public class Task implements Serializable {
    
        /**
         * 任务id
         */
        private Long taskId;
        /**
         * 类型
         */
        private Integer taskType;
    
        /**
         * 优先级
         */
        private Integer priority;
    
        /**
         * 执行id
         */
        private long executeTime;
    
        /**
         * task参数
         */
        private byte[] parameters;
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.4 添加任务

    2.4.1 添加任务到数据库中

    addTaskToDb(task);修改任务表和日志表

    @Autowired
        private TaskinfoMapper taskinfoMapper;
    
        @Autowired
        private TaskinfoLogsMapper taskinfoLogsMapper;
    
        /**
         * 添加任务到数据库中
         *
         * @param task
         * @return
         */
        private boolean addTaskToDb(Task task) {
    
            boolean flag = false;
    
            try {
                //保存任务表
                Taskinfo taskinfo = new Taskinfo();
                BeanUtils.copyProperties(task, taskinfo);
                taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
                taskinfoMapper.insert(taskinfo);
    
                //设置taskID
                task.setTaskId(taskinfo.getTaskId());
    
                //保存任务日志数据
                TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
                BeanUtils.copyProperties(taskinfo, taskinfoLogs);
                taskinfoLogs.setVersion(1);
                taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
                taskinfoLogsMapper.insert(taskinfoLogs);
    
                flag = true;
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return flag;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    2.4.2 添加任务到redis

    addTaskToCache(task);判断任务执行之间是否在现在还是未来五分钟内

    @Autowired
        private CacheService cacheService;
    
        /**
         * 把任务添加到redis中
         *
         * @param task
         */
        private void addTaskToCache(Task task) {
    
            String key = task.getTaskType() + "_" + task.getPriority();
    
            //获取5分钟之后的时间  毫秒值
            Calendar calendar = Calendar.getInstance();
            calendar.add(Calendar.MINUTE, 5);
            long nextScheduleTime = calendar.getTimeInMillis();
    
            //2.1 如果任务的执行时间小于等于当前时间,存入list
            if (task.getExecuteTime() <= System.currentTimeMillis()) {
                cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
            } else if (task.getExecuteTime() <= nextScheduleTime) {
                //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
                cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
            }
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2.5 删除任务

    1、删除数据库任务表,更改日志表任务状态
    2、删除list或者zset中的任务

    在TaskService中添加方法

    /**
         * 取消任务
         * @param taskId        任务id
         * @return              取消结果
         */
    public boolean cancelTask(long taskId);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    /**
         * 取消任务
         * @param taskId
         * @return
         */
    @Override
    public boolean cancelTask(long taskId) {
    
        boolean flag = false;
    
        //删除任务,更新日志
        Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
    
        //删除redis的数据
        if(task != null){
            removeTaskFromCache(task);
            flag = true;
        }
    
    
    
        return false;
    }
    
    /**
         * 删除redis中的任务数据
         * @param task
         */
    private void removeTaskFromCache(Task task) {
    
        String key = task.getTaskType()+"_"+task.getPriority();
    
        if(task.getExecuteTime()<=System.currentTimeMillis()){
            cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
        }else {
            cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
        }
    }
    
    /**
         * 删除任务,更新任务日志状态
         * @param taskId
         * @param status
         * @return
         */
    private Task updateDb(long taskId, int status) {
        Task task = null;
        try {
            //删除任务
            taskinfoMapper.deleteById(taskId);
    
            TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
            taskinfoLogs.setStatus(status);
            taskinfoLogsMapper.updateById(taskinfoLogs);
    
            task = new Task();
            BeanUtils.copyProperties(taskinfoLogs,task);
            task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
        }catch (Exception e){
            log.error("task cancel exception taskid={}",taskId);
        }
    
        return task;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    2.6 消费任务

    1、删除list中的数据
    2、使用updateDB删除任务表、跟新日志表

    在TaskService中添加方法

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    public Task poll(int type,int priority);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    实现

    /**
         * 按照类型和优先级拉取任务
         * @return
         */
    @Override
    public Task poll(int type,int priority) {
        Task task = null;
        try {
            String key = type+"_"+priority;
            String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
            if(StringUtils.isNotBlank(task_json)){
                task = JSON.parseObject(task_json, Task.class);
                //更新数据库信息
                updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
            }
        }catch (Exception e){
            e.printStackTrace();
            log.error("poll task exception");
        }
    
        return task;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.7 未来定时任务更新-reids管道

    减少与redis的交互次数
    1、在引导类中添加开启任务调度注解:@EnableScheduling
    2、在service中添加定时任务 @Scheduled(cron = “0 */1 * * * ?”),每分钟一次

    @Scheduled(cron = "0 */1 * * * ?")
    public void refresh() {
        System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
    
        // 获取所有未来数据集合的key值
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
        for (String futureKey : futureKeys) { // future_250_250
    
            String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
            //获取该组key下当前需要消费的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
            if (!tasks.isEmpty()) {
                //将这些任务数据添加到消费者队列中
                cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
    
            List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
                @Nullable
                @Override
                public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                    StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
                    String[] strings = values.toArray(new String[values.size()]);
                    stringRedisConnection.rPush(topic_key,strings);
                    stringRedisConnection.zRem(future_key,strings);
                    return null;
                }
            });
            return objects;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    总结

    1、使用rebbitmq使用的场景是在支付和订单微服务中,用于实现消息可以延迟30分钟付款的功能。并借用该中间件的插件实现支付的异步下单功能,并可以快速处理前几分钟,防止消息堆积
    2、使用redis是基于zset的去重和排序功能,相当于将一定数据的保存在数据库,使用定时任务同步数据库符合五分钟的任务到zset中,然后,在在zest中定时更新可以运行的任务到list集合中,相当于实现了延迟功能和缓存功能。
    3、第二种还可以扩展为将rabbitmq中等待时间较长的数据存到redis中,然后定时的去同步redis中的数据到数据库中,防止消息堆积。

  • 相关阅读:
    ros2移植Apollo和autoware规控算法可跑工程
    Python中迭代字典并在迭代过程中修改元素的解决方法
    一文揭秘育碧等传统游戏大厂的NFT策略
    Python之requests实现github模拟登录
    accent-color的使用
    java-net-php-python-901ssm高校选用教材子系统ppt计算机毕业设计程序
    如何隐藏Selenium特征实现自动化网页采集
    【Android】【实践】
    Verilog 随机数及概率分布
    【Java】绘图入门和机制,绘图方法演示(绘制坦克)
  • 原文地址:https://blog.csdn.net/m0_57084845/article/details/134391398