• Delay - 如何用 Redis 打造一个延迟队列、广播(附加工具)


    Delay - 如何用 Redis 打造一个延迟队列、广播(附加描述)

    1. 工具

    当然在写一个项目的时候肯定是会造一些工具类的轮子的,在 Redis 延迟队列中,涉及到的工具如下:

    1.1 智能睡眠工具

    睡眠工具是根据搬运线程每次搬运之后返回剩余的队列中最小的分数,因为在 zset 中分数是用时间戳,所以拿到最小的时间戳就可以计算出本次睡眠的时间:

    /**
     * 根据搬运的结果中返回队列中剩余的最小分数,通过分数计算出本次睡眠的时间
     *
     * @author zyred
     * @since v 0.5 新建
     * @since v 1.2 优化 v 0.5 版本中根据剩余数量计算睡眠时间的方式
     */
    public class SmartSleepUtil {
        /**
         * @update v 1.2 没有值的情况,睡眠 5 分钟
         */
        private static final int MAX_SLEEP_SEC = 5 * 60 * 1000;
        /**
         * 这里保存当前线程的目的,是为了任何时候需要打断搬运线程睡眠都是可以的,并且是立即打断
         */
        private volatile static Thread transferThread = null;
    
        /**
         * 立即唤醒,起来上班.......
         * 该方法存在多个线程同时调用的情况,所以需要加锁,且 transferThread 要内存可见
         */
        public static synchronized void immediatelyArouse () {
            if (Objects.nonNull(transferThread)) {
                LockSupport.unpark(transferThread);
                transferThread = null;
            }
        }
    
        /**
         * 搬运线程睡眠
         *
         * @param min    timeout_table 列表内最小的一个时间
         */
        public static void sleep (long min) {
            transferThread = Thread.currentThread();
            if (min == 0) {
                LockSupport.parkNanos(MAX_SLEEP_SEC * Constant.nanos);
                transferThread = null;
                return;
            }
            if (System.currentTimeMillis() >= min) {
                transferThread = null;
                return;
            }
            long sleepTime = min - System.currentTimeMillis();
            if (sleepTime == 0) {
                transferThread = null;
                return;
            }
            LockSupport.parkNanos(sleepTime * Constant.nanos);
            transferThread = null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    1.2 spring boot starter 注解驱动

    /**
     * 开启 spring 自动配置
     */
    @Target(ElementType.TYPE)
    @Import(RedisDelayQueueAutoConfiguration.class)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface EnableDelayQueue {
    }
    
    
    @AllArgsConstructor
    @EnableConfigurationProperties(RedisDelayQueueProperties.class)
    public class RedisDelayQueueAutoConfiguration {
    
        private Environment environment;
        private RedisDelayQueueProperties properties;
    
        /**
         * spring 加载延迟队列核心 bean 后初始化隔离策列
         * @since v 0.4
         */
        @PostConstruct
        public void parseServerPort () {
            final String port = this.environment.getProperty(Constant.serverPort);
            final String address = IpUtils.getAddress();
            String isolation = address + Constant.underline + port;
            isolation = DigestUtils.md5DigestAsHex(isolation.getBytes(StandardCharsets.UTF_8)).toUpperCase(Locale.ROOT);
            isolation = isolation.substring(0, Constant.isolation);
            this.properties.setIsolation(isolation);
        }
    
        @Bean
        public DelayQueueContextFactory delayQueueCoreContext (RedissonClient client) {
            return new DelayQueueContextFactory(this.properties, client);
        }
    
        @Bean
        public RedissonClient redissonClient () {
            Config config = this.properties.getConfig();
            return Redisson.create(config);
        }
    
        @Bean
        public ProviderDelayJob<QueueJob> providerDelayQueue (RedissonClient client) {
            return this.delayQueueCoreContext(client).getProviderDelayQueue();
        }
    
        @Bean
        public CrashHandlerContext<QueueJob> shutdownHandlerContext (DelayQueueContextFactory factory) {
            return new CrashHandlerContext<>(factory, this.properties);
        }
    
        /**
         * 广播
         * @return  广播上下文工厂
         * @since v 1.0
         */
        @Bean
        @ConditionalOnProperty(prefix = "delay.queue", name = "enable-radio", havingValue = "true")
        public DelayRadioContextFactory radioQueueContextFactory (RedissonClient redissonClient, RedisDelayQueueProperties properties) {
            return new DelayRadioContextFactory(redissonClient, properties);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    2. lua 脚本

    lua 脚本只展示搬运脚本,其他脚本都非常简单

    ## 构造返回结果
    local resultMap = {}
    ## 查看 zset 中剩余未搬运 TOPIC 数量
    local topicCount = redis.call('ZCard', KEYS[2])
    if (tonumber(topicCount) > 0)
    then
    	## 根据当前传入的时间戳,拿到时间都小于当前的所有 topic
        local member = redis.call('ZRangeByScore', KEYS[2], 0, tonumber(ARGV[1]))
        if next(member) == nil
        then
        	## 成功搬运的 TOPIC
            table.insert(resultMap, '')
            local min = redis.call('ZRangeByScore', KEYS[2], '-inf', '+inf', 'WithScores', 'limit', 0, 1)
            ## 剩余的 TOPIC 数量
            table.insert(resultMap, min)
            ## 广播的 TOPIC
            table.insert(resultMap, '')
            return resultMap
        else
        	## 准备变量,装填已经搬运的 topic,返回给 Java
            local readyTopic = ''
            ## 准备变量,装填需要广播的 topic
            local publishTopic = ''
            ## 便利读取出来的内容
            for key, value in ipairs(member)
            do	
            	## 循环内,每个都要去验证以下时间是不是满足被消费的条件
                local score = redis.call('ZScore', KEYS[2], value)
                if tonumber(ARGV[1]) >= tonumber(score)
                then
                	## 字符串切分,没啥好说的
                    local index = string.find(value, ':')
                    local subTopic = string.sub(value, 1, index - 1)
                    local topic = string.gsub(subTopic, '\"', '')
                    ## 取值,看看在 TOPIC 上是否有广播的标识
                    local radioTag = string.find(topic, ARGV[3])
                    if radioTag == nil
                    then
                    	## 这种写法是为了避免 阿里云 对 lua 脚本的限制,实际上
                    	## Java 传入的参数中,没有数组下标为 4 的内容
                        KEYS[4] = KEYS[1] ..':'.. ARGV[2] ..':'..topic
                        ## 将数据添加到 set 集合中
                        redis.call('SAdd', KEYS[4], value)
                        ## 删除掉 zset 里面的内容
                        redis.call('ZRem', KEYS[2], value)
                        ## 重新赋值,不然会内容叠加
                        KEYS[4] = ''
                        readyTopic = readyTopic .. ',' .. value
                    else
                    	## 处理广播,只需要返回给 Java 即可,后续的逻辑由 Java 来发布
                        redis.call('ZRem', KEYS[2], value)
                        publishTopic = publishTopic .. ',' .. value
                    end
                else
                    return nil;
                end
            end
            ## 重新读取一次剩余队列中最小的分数,提供给下次计算
            local min = redis.call('ZRangeByScore', KEYS[2], '-inf', '+inf', 'WithScores', 'limit', 0, 1)
            ## 装填数据,返回
            table.insert(resultMap, readyTopic)
            table.insert(resultMap, min)
            table.insert(resultMap, publishTopic)
            return resultMap
        end
        return nil
    end
    return nil
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    3. 完结总结

    说明:本文所写的项目是 无法开源 的,通过几篇文章的表述,已经将核心的代码与设计思想梳理清楚,如果疑问,可以联系博主一起优化和重新设计。

    本项目的来源也是对公司的一个 2017 年的现有功能进行了全面优化和提升性能和重构,项目开发周期大概一个月时间,因为其中也遇到过一些棘手的问题需要解决,并且利用的是业余时间进行开发的,所以周期相对拉长了很多。

    对我而言,开发这个项目确实有一定的提升,起码奠定了我对中间件的认知程度,让我对中间件开发有了更深入的兴趣。

    完 …

  • 相关阅读:
    各种添加路由的方法
    VMware vSphere ESXI 6.7 U3封装RTL8125B网卡驱动
    无法调试MFC源码
    电视机顶盒哪个牌子好?拆机达人盘点网络电视机顶盒排名
    知识图谱实体对齐3:无监督和自监督的方法
    五、Docker仓库之https的registry搭建(二)
    [每周一更]-(第22期):什么是gRPC?
    java实现pdf转为word
    基于JAVA客户台账管理计算机毕业设计源码+系统+mysql数据库+lw文档+部署
    flutter开发实战-video_player播放多个视频MediaCodecVideoRenderer error问题
  • 原文地址:https://blog.csdn.net/qq_38800175/article/details/124846028