1、思路
1.1、 用Map存储时间戳和List任务的对应关系。
- 时间戳的好处不在乎时区,任何时区生成的时间戳都一样
- 看时间轮一格的时间是多少,我设置是1秒。就是需要在这1秒中执行的所有任务
1.2、一个线程分发任务
- 用一个线程,将需要在这一秒执行的任务,丢到线程池中执行
- 我设计的一个线程执行,用了一种比较巧妙的设计,通过变量IS_RUNNING来控制这个线程是否在运行,没运行则new Thread().start()。好处就是,任务走完了,这个线程就销毁了,有新的线程过来,就又再新建一个线程
- 设置线程名称好排查错误
- 1s一格的时间轮,任务分发完后,就sleep 1s,分发任务的时间就先忽略,没有那么高得要求
- 自定义接口函数,而没用Supplier,为了约束任务类型
1.3、线程池执行真正的业务任务
- 线程分发时,拿到需要这一秒执行的任务,就丢到线程池执行
2、代码
2.1、接口函数的定义
@FunctionalInterface
public interface MethodExecuteFunctional {
Object execute();
}
2.2、时间轮核心代码
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
public class WheelSecondTimerExecutor {
private static final long DAY_SECOND = 24 * 60 * 60;
private final static Map<Long, List<MethodExecuteFunctional>> TASK_MAP = new ConcurrentHashMap<>();
private final static Set<Long> KEYS = new HashSet<>();
private static boolean IS_RUNNING = false;
private static ExecutorService WHEEL_TIMER_WORKER = new ThreadPoolExecutor(5, 15,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
public static boolean submitTask(long executeTime, MethodExecuteFunctional functional) {
long leftSecond = executeTime - System.currentTimeMillis();
if (leftSecond > DAY_SECOND * 1000) {
return false;
}
executeTime = executeTime / 1000;
KEYS.add(executeTime);
List<MethodExecuteFunctional> functionals = TASK_MAP.computeIfAbsent(executeTime, fun -> new ArrayList<>());
functionals.add(functional);
start();
return true;
}
private static void start() {
if (IS_RUNNING) {
return;
}
IS_RUNNING = true;
new Thread(() -> {
Thread.currentThread().setName("WheelSecondTimerExecutorThread");
try {
while (!KEYS.isEmpty()) {
long currentSecond = System.currentTimeMillis() / 1000;
List<Long> currentRunIds = KEYS.stream().filter(time -> time <= currentSecond).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(currentRunIds)) {
List<MethodExecuteFunctional> funs = currentRunIds.stream().map(TASK_MAP::get).flatMap(Collection::stream)
.collect(Collectors.toList());
currentRunIds.forEach(TASK_MAP::remove);
KEYS.removeAll(currentRunIds);
if (CollectionUtils.isNotEmpty(funs)) {
for (MethodExecuteFunctional functional : funs) {
WHEEL_TIMER_WORKER.submit(functional::execute);
}
}
}
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("WheelSecondTimerExecutor excute task error", e);
} finally {
IS_RUNNING = false;
}
}).start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91