尺有所短,寸有所长;不忘初心,方得始终。
请关注公众号:星河之码
线程池技术是一种多线程处理形式,将任务添加到队列中,通过线程池中创建出来的现成执行这些任务,省去了创建线程和销毁线程的消耗,我认为有点类似JAVA中Spring托管对象的思想在里面。
使用线程池可以根据系统的需求和硬件环境灵活的控制线程的数量,对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力。总的来说有三点:
降低资源消耗
通过系统的需求和硬件环境创建线程数,重复利用已创建的线程,降低线程创建和销毁造成的消耗。
提高响应速度
当任务到提交时,不需要创建线程,立即可以执行,执行完不需要销毁线程,线程池回收,最大限度的提高效率。
提高线程的可管理性
线程由线程池进行统一分配、调优和监控,提高系统的稳定性。
理解了上面为啥要用线程池,有了目的也就有了方向,有了方向就知道设计一个线程池思路了
先要有一个队列,存储要执行的任务
线程数有限,多余任务放在队列中等待执行
线程池中放多少个线程,同一时间线程池中可以执行多少个线程,得有一个大小限制
线程池也不能无限大小
啥时候创建线程放在线程池中呢
所有的线程都是一直存活不销毁的吗
队列满了以后,后面的任务怎么处理
线程池本身的生命周期时什么
当我们理解了这些问题之后,设计一个线程就会有方向跟思路了,同样的,我们也能围绕这些问题更好的去理解线程池的原理实现了。
在Java中,通过ThreadPoolExecutor 实现线程池技术,线程池的工作原理机其实主要就是在说ThreadPoolExecutor的工作流程
ThreadPoolExecutor是jdk自带的,在java.util.concurrent包中
ThreadPoolExecutor有四个构造方法,可以提供给我们创建线程池实例。
实际上四个构造方法中真正被使用的是第四个,也就是有7个参数的那个,其他三个都是通过默认的参数调用的第四个,源码如下
在第四个构造方法中才是真正创建ThreadPoolExecutor对象的地方
通过上述的源码中的构造函数可以看到,创建线程池一共有7个 参数
corePoolSize
线程池核心线程数最大值
maximumPoolSize
线程池最大线程数大小
keepAliveTime
线程池中非核心线程空闲的存活时间大小
unit
线程空闲存活时间单位
workQueue
存放任务的阻塞队列
threadFactory
用于设置创建线程的工厂,可以给创建的线程设置名字,可方便排查问题。
一般业务中会有多个线程池,可以根据线程池的名字去定位是哪一个线程出问题
handler
线程池的饱和策略事件,主要有四种类型。
当提交一个新的任务到线程池时,首先判断线程池中的存活线程数是否小于corePoolSize,如果小于说明有空闲的核心线程,线程池会创建一个核心线程去处理提交的任务。
如果线程池中的存活线程数是已经等于corePoolSize,新提交的任务会被存放进任务队列workQueue排队等待执行。
如果存活线程数已满,并且任务队列workQueue也满了,线程池会判断存活线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没满,则创建一个非核心线程执行提交的任务。
如果当前存活的线程数达到了maximumPoolSize,则针对新的任务直接采用拒绝策略处理。
当线程池中的活跃线程数达到【核心线程数】的时候,新的任务就会别放在阻塞队列中等待执行。
在ThreadPoolExecutor 的构造函数中的有一个BlockingQueue的参数。常用的阻塞队列都是BlockingQueue接口的实现类,常用的主要有以下几种:
ArrayBlockingQueue
有界队列:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue
可设置容量队列:基于链表结构的阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
DelayQueue
延迟队列:任务定时周期的延迟执行的队列,根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序
PriorityBlockingQueue
优先级队列:支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排列
SynchronousQueue
同步队列:不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
拒绝策略也叫饱和策略:当线程池中的活跃线程数达到【最大线程数】的时候,线程池接收到新的任务就会执行拒绝策略。
在ThreadPoolExecutor 的构造函数中的有一个RejectedExecutionHandler的参数。jdk默认有是个实现类,即四种拒绝策略
AbortPolicy
丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。
使用这个策略必须处理抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy
直接丢弃任务,不做任何处理。
DiscardOldestPolicy
丢弃阻塞队列 workQueue 中最老的一个任务,将当前这个任务继续提交给线程池,加入队列中。
CallerRunsPolicy(交给线程池调用所在的线程进行处理)
交给线程池调用所在的线程进行处理,即调用线程池的主线程处理。
由于是主线程自己处理,相当于没有用线程池,一般并发比较小,性能要求不高可以用,否则可能导致程序阻塞。
在日常开发中,我们可能需要根据实际的业务场景调整线程池的参数,ThreadPoolExecutor针对其构造方法为我们提供了几个方法可以调整常用的参数
以上参数也可以通过动态配置中心进行动态修改。
线程池中最重要的参数,API以及提供了动态更新的方法。可以配合动态配置中心进行动态修改。
当我们需要对线程池参数进行调整,而又很难确定 corePoolSize, workQueue,maximumPoolSize等参数达到什么样的大小时才符合业务指标的时候,我们就需要对线程池进行监控。
同样的ThreadPoolExecutor也提供了可以监控线程池的使用情况的几个方法:
方法 | 含义 |
---|---|
getActiveCount() | 线程池中正在执行任务的线程数量 |
getCompletedTaskCount() | 线程池已完成的任务数量,该值小于等于taskCount |
getCorePoolSize() | 线程池的核心线程数量 |
getLargestPoolSize() | 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize |
getMaximumPoolSize() | 线程池的最大线程数量 |
getPoolSize() | 线程池当前的线程数量 |
getTaskCount() | 线程池已经执行的和未执行的任务总数 |
ThreadPoolExecutor提供了两个方法向线程池提交任务,分别为execute()和submit()方法
execute
execute()方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10,20,5,
TimeUnit.MINUTES,new LinkedBlockingQueue<>() );
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("ThreadPoolExecutorTest 测试----->>>>>子线程执行 : "+ Thread.currentThread().getName());
}
});
}
submit
submit方法用于提交需要返回值的任务。调用submit方法线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值。
get()方法会阻塞主线程直到任务完成,而使用get(long timeout,TimeUnit unit),在指定的时间内会等待任务执行,超时则抛出超时异常,等待时候会阻塞当前线程
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10,20,5,
TimeUnit.MINUTES,new LinkedBlockingQueue<>() );
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("ThreadPoolExecutorTest 测试----->>>>>子线程执行 : " + Thread.currentThread().getName());
}
});
}
ThreadPoolExecutor中submit有三种实现:
submit 为什么能提交任务(Runnable)的同时也能返回任务(Future)的执行结果
通过追踪submit 三个先实现的源码发现,submit方法最终也是调用的execute方法,但是在调用execute之前还调用了一个newTaskFor 方法
继续追踪newTaskFor 方法,发现newTaskFor 将 task 封装成了 RunnableFuture,最终返回的是FutureTask 这个类。而FutureTask的继承关系如下
到这里就清晰了,submit之所以提交任务(Runnable)的同时也能返回任务(Future)的执行结果,主要分为两步:
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow()。
这两个的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程。
shutdown()
不会立即终止线程池,只是将线程池的状态设置成shutdown状态,此时不会接受新的任务,然后等所有任务z都执行完后才终止(包含正在执行和缓存队列中的任务)。
shutdownNow()
立即终止线程池,将线程池的状态设置成STOP,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
可以看到上图中还有一个isShutdown的方法,实际上还有其他两个方法
isShutDown
当调用shutdown()或shutdownNow()方法后返回为true
isTerminated
当调用线程池关闭,并且所有提交的任务都执行完成后返回为true
isTerminating
当调用线程池关闭,并且所有提交的任务都执行完成后返回为false
因此,判断线程池所有线程是否执行完成,可以这样写:
while(true){
if(threadPool.isTerminated()) {
//当调用线程池关闭,并且所有提交的任务都执行完成 跳出循环
break;//true停止
}
Thread.sleep(500);//休眠500继续循环
}
线程有六种状态,线程池同样也有状态,在ThreadPoolExecutor的源码中定义了线程池的五种运行状态,分别是:Running、ShutDown、Stop、Tidying、Terminated。
private static final int RUNNING = -1 << COUNT_BITS; // 运行中
private static final int SHUTDOWN = 0 << COUNT_BITS; // 已关闭
private static final int STOP = 1 << COUNT_BITS; // 已停止
private static final int TIDYING = 2 << COUNT_BITS; // 清洁中
private static final int TERMINATED = 3 << COUNT_BITS; // 已终止
Running
ShutDown
STOP
Tidying
状态说明:当所有的任务已终止,任务数量为0,线程池会变为TIDYING状态。
当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
状态切换:
当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
Terminated
Java通过Executors提供六种线程池,分别为:
通过Executors源码可以看到,jdk为我们提供了6种线程池,每种提供了有参和物参两个构造方法。
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。源码如下
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点
通过以上源码可以看出newFixedThreadPool具有以下特点:
工作机制
使用场景
newFixedThreadPool适用于处理CPU密集型的任务,能够保证CPU在工作时尽可能的减少线程分配,即适用执行长期稳定的任务。
案例
线程池大小为10,每个任务输出index后sleep 1秒,因此每秒会打印10个数字。
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
final int index = i;
executor.execute(() -> {
try {
System.out.println(index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
newFixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。同时在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。因此定长线程池的大小一般要根据系统资源进行设置。
创建一个可缓存线程池,当线程空闲时可灵活回收空闲线程,当有新任务时,没有空闲线程则新建线程。源码如下
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点
工作机制
使用场景
newCachedThreadPool一般用于并发大,执行时间短的小任务。
案例
如下案例中线程执行休眠了秒钟,任务的提交速度会大于线程执行的速度
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
final int index = i;
executor.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(index);
});
}
}
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程。这种情况下由于大量线程同时运行,很有会耗尽 CPU 和内存资源,造成系统OOM。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
创建一个单线程化的线程池,即只创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行,源码如下:
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
特点
可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活跃的。
工作机制
使用场景
适用于串行执行任务的场景,一个任务一个任务地执行。
案例
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
final int index = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在执行输出数字:" + index);
});
}
}
输出结果如下,从结果可知是同一个线程执行了100个任务
创建一个定长的线程池,支持定时及周期性任务执行。源码如下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
特点
工作机制
使用场景
周期性执行任务的场景,
需要限制线程数量的场景
案例
这个线程池的案例分为两种:按某种速率周期执行 和 在某个延迟后执行
在某个延迟后执行
public static void main(String[] args) {
/**
* 创建一个给定初始延迟的间隔性的任务,案例中是2秒
* 后面的每次执行时间是上一次任务从执行到结束所需要的时间+给定的间隔时间(2秒)
*/
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
executor.schedule(() -> {
System.out.println(Thread.currentThread().getName() + "正在执行,delay 2 seconds");
}, 2, TimeUnit.SECONDS);
}
}
按某种速率周期执行
public static void main(String[] args) {
/**
* 创建一个给定初始延迟的间隔性的任务,.
* 后面每次任务执行时间为 初始延迟 + N * delay(间隔)
*/
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
executor.scheduleAtFixedRate(() -> {
System.out.println("延迟 2 秒,每 4 秒执行一次");
}, 2, 4, TimeUnit.SECONDS);
}
}
通过newSingleThreadScheduledExecutor的源码可以知道,其本质上很newScheduledThreadPool一样,也是通过ScheduledThreadPoolExecutor创建线程池,并且核心线程数为1
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
特点
工作机制
产生一个线程池大小为1的ScheduledExecutorService对象,当任务多于一个时将按先后顺序执行。
案例
由于其实现也是ScheduledThreadPoolExecutor对象创建,其案例实现跟newScheduledThreadPool一样的
newWorkStealingPool即任务窃取线程池:具有抢占式操作的线程池,每个线程都有一个任务队列存放任务
通过以下源码得知,newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,使用ForkJoinPool的好处是,把1个任务拆分成多个【小任务】,把这些小任务分发到多个线程上执行。这些小任务都执行完成后,再将结果合并。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Runtime.getRuntime().availableProcessors()是获取当前系统可以的CPU核心数。
ThreadPoolExecutor和ForkJoinPool都是在统一的一个Executors类中实现
特点
可以传入线程的数量,不传入,则默认使用当前计算机中可用的cpu数量
合理的使用CPU进行对任务操作(并行操作),所以newWorkStealingPool适合使用在很耗时的操作。
工作机制
newWorkStealingPool默认会以当前机器的CPU处理器个数为线程个数,并行处理任务,且不保证顺序,同时并发数能作为参数设置,WorkStealingPool能够做到并行执行与设置的并发数相同的任务数量,多余的任务则会进入等待队列,等待被执行。
案例
public static void main(String[] args) throws InterruptedException {
// 线程数
int threads = 10;
// 用于计数线程是否执行完成
CountDownLatch countDownLatch = new CountDownLatch(threads);
ExecutorService executorService = Executors.newWorkStealingPool();
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
System.out.println(e);
} finally {
countDownLatch.countDown();
}
});
countDownLatch.await();
}
Java通过Executors提供六种线程池,我们一般使用线程池也可以直接通过Executors对象new一个线程池,如下
但是在阿里巴巴Java开发手册,明确指出不允许使用Executors静态工厂构建线程池,这又是为什么呢
其主要原因是:【规避资源耗尽,清晰线程池的运行规则】。
Executors返回的线程池对象的弊端
FixedThreadPool 和 SingleThreadPool
这两个线程池允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
CachedThreadPool 和 ScheduledThreadPool
这两个线程池允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
创建线程池的正确方式
避免使用Executors创建线程池,主要是避免使用其中的默认实现,我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量,设置最大线程数。
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
创建线程池也可以使用开源类库:开源类库,如apache和guava等。