ThreadPoolExecutor实现的顶层接口是Executor,在接口Executor中用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中,由Executor框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力
- 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
- 提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService则是上层的抽象类:
将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
ThreadPoolExecutor实现最复杂的运行部分:
可以自动创建、管理和复用指定数量的一组线程,适用方只需提交任务即可线程安全,ThreadPoolExecutor内部有状态、核心线程数、非核心线程等属性,广泛使用了CAS和AQS锁机制避免并发带来的冲突问题
提供了核心线程、缓冲阻塞队列、非核心线程、抛弃策略的概念,可以根据实际应用场景进行组合使用
提供了beforeExecute 和afterExecute()可以支持对线程池的功能进行扩展
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* 任务阻塞队列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 非公平的互斥锁(可重入锁)
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程集合一个Worker对应一个线程,没有核心线程的说话,只有核心线程数
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 配合mainLock通过Condition能够更加精细的控制多线程的休眠与唤醒
*/
private final Condition termination = mainLock.newCondition();
/**
* 线程池中线程数量曾经达到过的最大值。
*/
private int largestPoolSize;
/**
* 已完成任务数量
*/
private long completedTaskCount;
/**
* ThreadFactory对象,用于创建线程。
*/
private volatile ThreadFactory threadFactory;
/**
* 拒绝策略的处理句柄
* 现在默认提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
*/
private volatile RejectedExecutionHandler handler;
/**
* 线程池维护线程(超过核心线程数)所允许的空闲时间
*/
private volatile long keepAliveTime;
/**
* 允许线程池中的核心线程超时进行销毁
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池维护线程的最小数量,哪怕是空闲的
*/
private volatile int corePoolSize;
/**
* 线程池维护的最大线程数量,线程数超过这个数量之后新提交的任务就需要进入阻塞队列
*/
private volatile int maximumPoolSize;
核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程
newCachedThreadPool是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。
会出下面大量的线程对象,导致的OOM
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
执行大量短生命周期任务。因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。
它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景,SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,使用无界队列LinkedBlockingQueue作为线程池的工作队列
newSingleThreadExecutor 创建是一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
**适用于串行执行任务场景**
会存在出现阻塞队列堆积过大,导致的OOM
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize等于maximumPoolSize,所以线程池中只有核心线程,使用无界阻塞队列LinkedBlockingQueue作为工作队列
适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
newFixedThreadPool:创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。
会存在出现阻塞队列堆积大,导致的OOM
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newScheduledThreadPool 创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。
线程总数阈值为Integer.MAX_VALUE,工作队列使用DelayedWorkQueue,非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。
会存在出现阻塞队列堆积过大,导致的OOM
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
可以看出来上面的方法一共使用了DelayedWorkQueue、LinkedBlockingQueue 和 SynchronousQueue。这个就是线程核心之一的阻塞队列。
它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;
直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;
有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。
在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
无界的任务队列:无界任务队列可以使用LinkedBlockingQueue实现,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
优先任务队列:优先任务队列通过PriorityBlockingQueue实现,使用平衡二叉树堆,实现的具有优先级的无界阻塞队列
任务会按优先级重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。
PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
其实LinkedBlockingQueue也是可以设置界限的,它默认的界限是Integer.MAX_VALUE。同时也支持也支持构造的时候设置队列大小。
无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
当Executor已经关闭,即执行了executorService.shutdown()方法后,或者Executor将有限边界用于最大线程和工作队列容量,且已经饱和时。使用方法execute()提交的新任务将被拒绝. 在以上述情况下,execute方法将调用其RejectedExecutionHandler的rejectExecution()方法
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
也称为终止策略,遭到拒绝将抛出运行时RejectedExecutionException。业务方能通过捕获异常及时得到对本次任务提交的结果反馈。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
拥有自主反馈控制,让提交者执行提交任务,能够减缓新任务的提交速度。这种情况是需要让所有的任务都执行完毕。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
拒绝任务的处理程序,静默丢弃任务。使用此策略,我们可能无法感知系统的异常状态。慎用~!
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
丢弃队列中最前面的任务,然后重新提交被拒绝的任务。是否要使用此策略需要看业务是否需要新老的替换,慎用~!(LRU)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
根据任务所需要的cpu和io资源的量可以分为,
为了合理最大限度的使用系统资源同时也要保证的程序的高性能,可以给CPU密集型任务和IO密集型任务配置一些线程数。