Executor作为一个底层接口,定义了线程的执行方法:
public interface Executor {
void execute(Runnable command);
}
其中该包中的ExecutorService继承了Executor是一个运用更广泛的接口,它直接定义了线程池的操作方法:
public interface ExecutorService extends Executor {
// 关闭线程池,会等待正在执行中的线程执行完
void shutdown();
// 立刻关闭线程池,如果有正在执行的线程,会被中断
List<Runnable> shutdownNow();
// 判断线程池是否被关闭了
boolean isShutdown();
// 判断线程池是否终止了,这是线程池的最终状态
boolean isTerminated();
// 等待给定时间后终止线程池
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个执行任务,返回一个未来的引用,这个引用可以获取到执行结果
<T> Future<T> submit(Callable<T> task);
// 提交一个执行任务,返回一个未来的引用,这个引用可获取到结果是传入的参数result
<T> Future<T> submit(Runnable task, T result);
// 提交一个执行任务,返回一个未来的引用,这个引用可以获取到执行结果,入参跟Callable相比就是入参的范围比较广了,只要实现Runable接口即可
Future<?> submit(Runnable task);
// 执行所有传入的任务,等待所有任务结束才返回,结果都在返回的list里面,
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 跟上面方面相比,会有一个时间限制完成所有的任务,如果时间耗尽,有些任务没执行完,则没有结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 返回任意一个执行的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 给定时间内返回任意一个执行结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而AbstractExecutorService是实现ExecutorService接口的抽象类,定制了线程池的一些通用操作。
ThreadPoolExecutor继承AbstractExecutorService,对线程池的细节进行实现。
ForkJoinPool继承AbstractExecutorService,针对计算密集型的需求,把大的任务拆分成多个小任务(即fork),然后再将多个小任务处理汇总到一个结果上(即join)的设计实现。
ScheduledExecutorService是继承ExecutorService接口的接口,定制了线程循环或者延迟的调度操作接口,而对应的ScheduledThreadPoolExecutor继承了ThreadPoolExecutor跟ScheduledExecutorService,定制线程池管理的线程可以进行循环或者延迟的操作。
同包下有个Executors,给使用者提供上面三个实例类的一些常用的创建实例方法。
如果没有做线程的复用并且并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。
所以为了复用线程就做了一个线程池,线程池为线程的开销和资源不足问题提供了解决方案,通过对多个任务复用线程,线程创建的开销被分摊到了多个任务上。
java中ThreadPoolExecutor就是基础线程池的实现。
ThreadPoolExecutor有七个核心的参数:
线程池有5种状态:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt记录着runState和workerCount,即线程池状态和有效线程数
int c = ctl.get();
// workerCountOf方法取出低29位的值,表示当前活动的线程数
// 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中(调用addWorker创建线程执行任务)
if (workerCountOf(c) < corePoolSize) {
// addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
// 如果为true,根据corePoolSize来判断;
// 如果为false,则根据maximumPoolSize来判断
if (addWorker(command, true))
return;
// 如果添加失败,则重新获取ctl值
c = ctl.get();
}
// 如果不小于corePoolSize,则将任务添加到workQueue队列
// 如果当前线程池是运行状态并且任务添加到队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
// 需要移除该command,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
* 这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount不等于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示线程池状态为STOP或者TIDYING或者TERMINATED,此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 在创建非核心线程时,即core等于false时,判断当前线程数是否大于等于maximumPoolSize,
// 在core等于true时,判断当前线程数是否大于等于corePoolSize,
// 如果大于等于则返回false。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试CAS增加workerCount,如果成功,则跳出所有for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回到第一个for循环外继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet();
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动这个线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// firstTask用它来保存传入的任务
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 禁止在执行任务前对线程进行中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 可以看到在创建Worker时会调用threadFactory来创建一个线程
// newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// addWorker中的t.start()就会触发该run方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。Worker继承了AQS,使用AQS来实现独占锁的功能, 用于判断线程是否空闲以及是否可以被中断。
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:
通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。
ForkJoinPool与ThreadPoolExecutor结构相似,它可以通过规则将一个任务分割为可附加的多个子任务,并行执行这些子任务,最终对子任务的运算结果进行快速排序算法等聚合运算,得到最终结果。
使用ForkJoinPool的第一个原则是确保任务拆分的合理性,它除了拆分任务之外还有另一个更强大的特性,就是它实现了工作窃取。它的线程池中的每一个线程,都有着自己的专属任务队列,线程会优先处理自己队列中的任务,如果队列是空的,就会去其他线程的队列中寻找任务。因此,即便400万个任务当中,某个任务执行时间很长,ForkJoinPool中的其他线程也可以完成其余的任务。而ThreadPoolExecutor就做不到这点了,如果这种情况发生在它身上,其他线程无法接手额外的任务。
它主要用来在给定的延迟之后运行任务,或者定期执行任务。从构造方法可以得知,它做了一个拥有无界队列DelayedWorkQueue的线程池:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
在ScheduledThreadPoolExecutor的任务用ScheduledFutureTask来定义,他包含三个核心参数:
DelayedWorkQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumner小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
创建一个不限制线程数量但是会有过期时间的线程池,其队列使用的是SynchronousQueue,一种添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为1的阻塞队列, 所以提交的任务基本都会新建(如果没有空闲)一个线程去处理。因为使用的是无限大的线程数量,所以当任务过多时会有内存溢出的危险。
这个线程池通常会提高执行许多短期异步任务的程序的性能。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
创建一个线程不会被销毁的线程池,因为使用的是无限大的阻塞队列,所以当任务过多时会有内存溢出的危险。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
创建只有一个线程的线程池,省去切换线程的损耗,因为使用的是无限大的阻塞队列,所以当任务过多时会有内存溢出的危险。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
创建一个可以窃取的ForkJoinPool(当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行)。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
创建一个可调度的线程池,因为底层的线程池的最大线程数量不限,所以当任务过多时会有内存溢出的危险。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
创建一个线程只有1的可调度线程池,如果任务多于一个,任务将按先后顺序执行。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}