• 手撸时间轮


    1、思路

    1.1、 用Map存储时间戳和List任务的对应关系。

    1. 时间戳的好处不在乎时区,任何时区生成的时间戳都一样
    2. 看时间轮一格的时间是多少,我设置是1秒。就是需要在这1秒中执行的所有任务

    1.2、一个线程分发任务

    1. 用一个线程,将需要在这一秒执行的任务,丢到线程池中执行
    2. 我设计的一个线程执行,用了一种比较巧妙的设计,通过变量IS_RUNNING来控制这个线程是否在运行,没运行则new Thread().start()。好处就是,任务走完了,这个线程就销毁了,有新的线程过来,就又再新建一个线程
    3. 设置线程名称好排查错误
    4. 1s一格的时间轮,任务分发完后,就sleep 1s,分发任务的时间就先忽略,没有那么高得要求
    5. 自定义接口函数,而没用Supplier,为了约束任务类型

    1.3、线程池执行真正的业务任务

    1. 线程分发时,拿到需要这一秒执行的任务,就丢到线程池执行

    2、代码

    2.1、接口函数的定义

    @FunctionalInterface
    public interface MethodExecuteFunctional {
      Object execute();
    }
    
    • 1
    • 2
    • 3
    • 4

    2.2、时间轮核心代码

    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import org.apache.commons.collections4.CollectionUtils;
    
    
    public class WheelSecondTimerExecutor {
    
        private static final long DAY_SECOND = 24 * 60 * 60;
    
        // 存放数据部分,key为时间戳对应的秒数,Value为对应的需要执行的任务
        private final static Map<Long, List<MethodExecuteFunctional>> TASK_MAP = new ConcurrentHashMap<>();
        // 存放时间戳的秒数
        private final static Set<Long> KEYS = new HashSet<>();
    
        // 当前时间轮是否还没有停止
        private static boolean IS_RUNNING = false;
    
        // 执行时间轮上的任务线程池
        private static ExecutorService WHEEL_TIMER_WORKER = new ThreadPoolExecutor(5, 15,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
    
        /**
         * 提交任务到时间轮上, 只接收未来1天内的任务
         *
         * @param executeTime 需要在什么时间戳执行
         * @param functional  执行的方法体
         */
        public static boolean submitTask(long executeTime, MethodExecuteFunctional functional) {
            long leftSecond = executeTime - System.currentTimeMillis();
            // 提交失败,时间间隔不满足
            if (leftSecond > DAY_SECOND * 1000) {
                return false;
            }
            // 得到时间戳的秒数
            executeTime = executeTime / 1000;
            KEYS.add(executeTime);
            List<MethodExecuteFunctional> functionals = TASK_MAP.computeIfAbsent(executeTime, fun -> new ArrayList<>());
            functionals.add(functional);
            // 启动时间轮,进行时间持续时间检测
            start();
            return true;
        }
    
        private static void start() {
            if (IS_RUNNING) {
                return;
            }
            IS_RUNNING = true;
            new Thread(() -> {
                Thread.currentThread().setName("WheelSecondTimerExecutorThread");
                try {
                    while (!KEYS.isEmpty()) {
                        long currentSecond = System.currentTimeMillis() / 1000;
                        List<Long> currentRunIds = KEYS.stream().filter(time -> time <= currentSecond).collect(Collectors.toList());
                        if (CollectionUtils.isNotEmpty(currentRunIds)) {
                            List<MethodExecuteFunctional> funs = currentRunIds.stream().map(TASK_MAP::get).flatMap(Collection::stream)
                                    .collect(Collectors.toList());
                            currentRunIds.forEach(TASK_MAP::remove);
                            KEYS.removeAll(currentRunIds);
                            if (CollectionUtils.isNotEmpty(funs)) {
                                // 将任务放在线程池中
                                for (MethodExecuteFunctional functional : funs) {
                                    WHEEL_TIMER_WORKER.submit(functional::execute);
                                }
                            }
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    log.error("WheelSecondTimerExecutor excute task error", e);
                } finally {
                    // 时间轮上的任务执行完成后,停止检测
                    IS_RUNNING = false;
                }
            }).start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
  • 相关阅读:
    redis的下载和安装详解
    20220802NOI模拟赛--考后总结
    应用方案 | 内置ALC的音频前置放大器D2538A和D3308芯片
    CPI教程-异步接口创建及使用
    Lumiprobe无铜点击化学解决方案
    JavaWeb 尚硅谷书城项目
    <数据结构> - 数据结构在算法比赛中的应用(上)
    目前最好用的NAS系统是什么?
    在fastapi中实现异步
    【11.23+11.24】Codeforces 刷题
  • 原文地址:https://blog.csdn.net/lvyanqin2013/article/details/127439213