ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
//1. execute
// 和普通线程池执行一样
executor.execute(() -> {
System.out.println("execute");
});
//2. schedule
// 指定延迟时间,一次性执行任务
executor.schedule(() -> {
System.out.println("schedule");
},2000,TimeUnit.MILLISECONDS);
//3. AtFixedRate
// 周期性执行任务(周期时间:执行时间和延迟时间的最大值)
executor.scheduleAtFixedRate(() -> {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("at:" + System.currentTimeMillis());
},3000,2000,TimeUnit.MILLISECONDS);
//4. WithFixedDelay
// 周期性执行任务(周期时间:执行时间 + 延迟时间)
executor.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("with:" + System.currentTimeMillis());
},3000,2000,TimeUnit.MILLISECONDS);
}
ScheduledFutureTask 实现了 RunnableScheduledFuture 接口,间接的实现了 Delayed 接口,让任务可以放到延迟队列中,并且基于二叉堆做排序
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** 计数器,每个任务都有一个全局唯一的序号
如果任务的执行时间一模一样,比对sequenceNumber */
private final long sequenceNumber;
/** 任务执行的时间,单位是纳秒 */
private long time;
/*
* period == 0:表示一次性执行的任务
* period > 0:表示使用的是 scheduleAtFixedRate
* period < 0:表示使用的是 scheduleWithFixedDelay
* */
private final long period;
/** 周期性执行任务时,引用具体任务,方便后面重新扔到阻塞队列 */
RunnableScheduledFuture<V> outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;
// 实现Delayed接口重写的方法,执行时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 实现Delayed接口重写的方法,比较方式
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
DelayedWorkQueue 包装了 RunnableScheduledFuture>[]
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 初始长度
private static final int INITIAL_CAPACITY = 16;
// 数组
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 锁
private final ReentrantLock lock = new ReentrantLock();
// 长度
private int size = 0;
// 等待拿堆顶数据的线程
private Thread leader = null;
// Condition 队列
private final Condition available = lock.newCondition();
}
直接调用 schedule(),入参 delay = 0
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
延迟一段时间,执行一次任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
// 非空校验
if (callable == null || unit == null)
throw new NullPointerException();
// 将任务封装成 ScheduledFutureTask
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
// 延迟执行任务
delayedExecute(t);
return t;
}
// 返回当前任务要执行的系统时间
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
delayedExecute() 延迟执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池状态不是RUNNING,就拒绝任务
if (isShutdown())
reject(task);
else {
// 将任务放入延迟队列中(二叉堆)
super.getQueue().add(task);
// 1.线程池状态
// 2.根据策略决定是否能执行
// 3.将任务从延迟队列移除
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 是否需要创建线程
ensurePrestart();
}
}
// periodic - true:代表是周期性执行的任务
// periodic - false:代表是一次性的延迟任务
boolean canRunInCurrentRunState(boolean periodic) {
// 默认情况下,如果任务扔到了延迟队列中,有两个策略
// 如果任务是周期性执行的,默认为false(continueExistingPeriodicTasksAfterShutdown)
// 如果任务是一次性的延迟任务,默认为true(executeExistingDelayedTasksAfterShutdown)
// 此时,周期性任务返回false,一次性任务返回true
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// 判断当前任务到底执行不执行
final boolean isRunningOrShutdown(boolean shutdownOK) {
// 重新拿到线程池的ctl
int rs = runStateOf(ctl.get());
// 如果线程池是RUNNING,返回true
// 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
// 是否需要创建线程
void ensurePrestart() {
// 获取线程池中的工作线程个数。
int wc = workerCountOf(ctl.get());
// 如果工作线程个数,小于核心线程数,
if (wc < corePoolSize)
// 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
addWorker(null, true);
// 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
else if (wc == 0)
// 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
addWorker(null, false);
}
scheduleAtFixedRate 在包装 ScheduledFutureTask 时会将 period 设置为正数,代表固定周期执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将任务设置给outerTask属性,方便后期重新扔到延迟队列
sft.outerTask = t;
// 延迟执行任务
delayedExecute(t);
return t;
}
scheduleWithFixedDelay 在包装 ScheduledFutureTask 时会将 period 设置为负数,代表在执行任务完毕后,再计算下次执行的时间
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将任务设置给outerTask属性,方便后期重新扔到延迟队列
sft.outerTask = t;
// 延迟执行任务
delayedExecute(t);
return t;
}
执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。
// 执行任务
public void run() {
// 获取任务是否是周期执行
// true:周期执行
// false:一次的延迟执行
boolean periodic = isPeriodic();
// 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
if (!canRunInCurrentRunState(periodic))
// 取消任务
cancel(false);
else if (!periodic)
// 当前任务是一次性的延迟执行。执行任务具体的run方法
ScheduledFutureTask.super.run();
// 周期性任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下次任务运行时间
setNextRunTime();
// 重新将任务扔到延迟队列中
reExecutePeriodic(outerTask);
}
}
// 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
private void setNextRunTime() {
// 拿到当前任务的period
long p = period;
// period > 0:At
if (p > 0)
// 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
time = time + p;
else
// period < 0:With
// 任务执行完,拿当前系统时间计算下次执行的时间点
time = now() + p;
}
// 重新将任务扔到延迟队列中
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 线程池状态的判断
if (canRunInCurrentRunState(true)) {
// 将任务扔到了延迟队列中
super.getQueue().add(task);
// 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 是否需要创建线程
ensurePrestart();
}
}