在执行一个异步任务或并发任务时,往往是通过直接new Thread()方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池,线程池的优势很明显,如下:
java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等
Java API针对不同需求,利用Executors类提供了不同的线程池
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
- public void singleThreadExecutorDemo(){
- ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- for (int i = 0; i < 3; i++) {
- final int index = i;
-
- singleThreadExecutor.execute(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", index="+index);
- }
- });
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- pool-1-thread-1, index=0
- pool-1-thread-1, index=1
- pool-1-thread-1, index=2
从运行结果可以看出,所有任务都是在单一线程运行的。
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。线程池的大小上限为Integer.MAX_VALUE
- public void cachedThreadPoolDemo(){
- ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
- for (int i = 0; i < 5; i++) {
- final int index = i;
-
- cachedThreadPool.execute(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", index="+index);
- }
- });
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- pool-1-thread-1, index=0
- pool-1-thread-1, index=1
- pool-1-thread-1, index=2
- pool-1-thread-1, index=3
- pool-1-thread-1, index=4
-
从运行结果可以看出,整个过程都在同一个线程pool-1-thread-1中运行,后面线程复用前面的线程。
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,超出的线程会在队列中等待;如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
- public void fixedThreadPoolDemo(){
- ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
- for (int i = 0; i < 6; i++) {
- final int index = i;
-
- fixedThreadPool.execute(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", index="+index);
- }
- });
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- pool-1-thread-1, index=0
- pool-1-thread-2, index=1
- pool-1-thread-3, index=2
- pool-1-thread-1, index=3
- pool-1-thread-2, index=4
- pool-1-thread-3, index=5
从运行结果可以看出,线程池大小为3,每休眠1s后将任务提交给线程池的各个线程轮番交错地执行。线程池的大小设置,可参数Runtime.getRuntime().availableProcessors()。
创建一个可定时执行或周期执行任务的线程池,该方法可指定线程池的核心线程个数
- public void scheduledThreadPoolDemo(){
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
- //定时执行一次的任务,延迟1s后执行
- scheduledThreadPool.schedule(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", delay 1s");
- }
- }, 1, TimeUnit.SECONDS);
-
- //周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
- scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", every 3s");
- }
- }, 2, 3, TimeUnit.SECONDS);
- }
-
- pool-1-thread-1, delay 1s
- pool-1-thread-1, every 3s
- pool-1-thread-2, every 3s
- pool-1-thread-2, every 3s
ScheduledExecutorService功能强大,对于定时执行的任务,建议多采用该方法。
以上四种线程池,都是基于ThreadPoolExecutor创建的线程池,只是new ThreadPoolExecutor()的时候参数不同而已。JDK1.8还新增一种线程池newWorkStealingPool:
一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。 newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现;由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。
ThreadPoolExecutor类继承了AbstractExecutorService类,并提供了四个构造器。构造器中分别有一下参数:
6.threadFactory:(线程工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)
7.hander:(线程饱和策略):当线程池和队列都满了,再加入线程会执行此策略。有四种取值: ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常 ThreadPoolExecutor.DiscardPolicy:默默丢弃任务 不进行任何通知ThreadPoolExecutor.DiscardOlddestPolicy:丢弃队列最前的任务,重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:有调用线程处理该任务
所以根据上面分析我们可以看到,FixedThreadPool和SigleThreadExecutor中之所以用LinkedBlockingQueue无界队列,是因为设置了corePoolSize=maxPoolSize,线程数无法动态扩展,于是就设置了无界阻塞队列来应对不可知的任务量;而CachedThreadPool则使用的是SynchronousQueue同步移交队列,为什么使用这个队列呢?因为CachedThreadPool设置了corePoolSize=0,maxPoolSize=Integer.MAX_VALUE,来一个任务就创建一个线程来执行任务,用不到队列来存储任务;SchduledThreadPool用的是延迟队列DelayedWorkQueue。在实际项目开发中也是推荐使用手动创建线程池的方式,而不用默认方式,关于这点在《阿里巴巴开发规范》中是这样描述的:
上面说了使用Executors工具类创建的线程池有隐患,那如何使用才能避免这个隐患呢?建立自己的线程工厂类,灵活设置关键参数
//这里默认拒绝策略为AbortPolicy private static ExecutorService executor = new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));
使用guava包中的ThreadFactoryBuilder工厂类来构造线程池:
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
-
- private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());
通过guava的ThreadFactory工厂类还可以指定线程组名称,这对于后期定位错误时也是很有帮助的
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-d%").build();
向线程池提交的任务有两种:Runnable和Callable,二者的区别如下:
Callable是JDK1.5时加入的接口,作为Runnable的一种补充,允许有返回值,允许抛出异常。
三种提交任务的方式:
提交方式 是否关心 返回结果
Future<T> submit(Callable<T> task) 是
void execute(Runnable command) 否
Future<?> submit(Runnable task) 否,虽然返回Future,但是其get()方法总是返回null
@FunctionalInterface public interface Runnable { public abstract void run(); } @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Future接口用来表示执行异步任务的结果存储器,当一个任务的执行时间过长就可以采用这种方式:把任务提交给子线程去处理,主线程不用同步等待,当向线程池提交了一个Callable或Runnable任务时就会返回Future,用Future可以获取任务执行的返回结果。Future的主要方法包括:
get()方法:返回任务的执行结果,若任务还未执行完,则会一直阻塞直到完成为止,如果执行过程中发生异常,则抛出异常,但是主线程是感知不到并且不受影响的,除非调用get()方法进行获取结果则会抛出ExecutionException异常;
get(long timeout, TimeUnit unit):在指定时间内返回任务的执行结果,超时未返回会抛出TimeoutException,这个时候需要显式的取消任务;
cancel(boolean mayInterruptIfRunning):取消任务,boolean类型入参表示如果任务正在运行中是否强制中断;
isDone():判断任务是否执行完毕,执行完毕不代表任务一定成功执行,比如任务执行失但也执行完毕、任务被中断了也执行完毕都会返回true,它仅仅表示一种状态说后面任务不会再执行了;
isCancelled():判断任务是否被取消;
下面来实际演示Future和FutureTask的用法:
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- Future<Integer> future = executorService.submit(new Task());
- Integer integer = future.get();
- System.out.println(integer);
- executorService.shutdown();
- }
-
- static class Task implements Callable<Integer> {
- @Override
- public Integer call() throws Exception {
- System.out.println("子线程开始计算");
- int sum = 0;
- for (int i = 0; i <= 100; i++) {
- sum += i;
- }
- return sum;
- }
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- FutureTask<Integer> futureTask = new FutureTask<>(new Task());
- executorService.submit(futureTask);
- Integer integer = futureTask.get();
- System.out.println(integer);
- executorService.shutdown();
- }
-
- static class Task implements Callable<Integer> {
- @Override
- public Integer call() throws Exception {
- System.out.println("子线程开始计算");
- int sum = 0;
- for (int i = 0; i <= 100; i++) {
- sum += i;
- }
- return sum;
- }
- }
shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表
shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略
isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true
1、判断核心线程池是否已满,没满则创建一个新的工作线程来执行任务。已满则。
2、判断任务队列是否已满,没满则将新提交的任务添加在工作队列,已满则。
3、判断整个线程池是否已满,没满则创建一个新的工作线程来执行任务,已满则执行饱和策略。
(1、判断线程池中当前线程数是否大于核心线程数,如果小于,在创建一个新的线程来执行任务,如果大于则
2、判断任务队列是否已满,没满则将新提交的任务添加在工作队列,已满则。
3、判断线程池中当前线程数是否大于最大线程数,如果小于,则创建一个新的线程来执行任务,如果大于,则执行饱和策略。)
因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。
阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。
使得在线程不至于一直占用cpu资源。
(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如下
while (task != null || (task = getTask()) != null) {})。
不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢?
CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。
IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。
混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
当前在JDK中默认使用的线程池 ThreadPoolExecutor,在具体使用场景中,有以下几个缺点
1.core线程一般不会timeOut
2.新任务提交时,如果工作线程数小于 coreSize,会自动先创建线程,即使当前工作线程已经空闲,这样会造成空闲线程浪费
3.设置的maxSize参数只有在队列满之后,才会生效,而默认情况下容器队列会很大(比如1000)
如一个coreSize为10,maxSize为100,队列长度为1000的线程池,在运行一段时间之后的效果会是以下2个效果:
1.系统空闲时,线程池中始终保持10个线程不变,有一部分线程在执行任务,另一部分线程一直wait中(即使设置allowCoreThreadTimeOut)
2.系统繁忙时,线程池中线程仍然为10个,但队列中有还没有执行的任务(不超过1000),存在任务堆积现象
本文将描述一下简单版本的线程池,参考于 Tomcat ThreadPoolExecutor, 实现以下3个目标
1.新任务提交时,如果有空闲线程,直接让空闲线程执行任务,而非创建新线程
2.如果coreSize满了,并且线程数没有超过maxSize,则优先创建线程,而不是放入队列
3.其它规则与ThreadPoolExecutor一致,如 timeOut机制
首先看一下ThreadPoolExecutor的执行逻辑, 其基本逻辑如下
1.如果线程数小于coreSize,直接创建新线程并执行(coreSize逻辑)
2.尝试放入队列
3.放入队列失败,则尝试创建新线程(maxSize逻辑)
而执行线程的任务执行逻辑,就是不断地从队列里面获取任务并执行,换言之,即如果有执行线程,直接往队列里面放任务,执行线程就会被通知到并直接执行任务
空闲线程优先
空闲线程优先在基本逻辑中,即如果线程数小于coreSize,但如果有空闲线程,就取消创建线程的逻辑. 在有空闲线程的情况下,直接将任务放入队列中,即达到任务执行的目的。
这里的逻辑即是直接调整默认的ThreadPoolExecutor逻辑,通过重载 execute(Runnable) 方法达到效果. 具体代码如下所示:
public void execute(Runnable command) { //此处优先处理有活跃线程的情况,避免在<coreSize时,直接创建线程 if(getActiveCount() < getPoolSize()) { if(pool1.offer(command)) { return; } } super.execute(command); }
coreSize满了优先创建线程
从之前的逻辑来看,如果放入队列失败,则尝试创建新线程。在这个时候,相应的coreSize肯定已经满了。那么,只需要处理一下逻辑,将其offer调整为false,即可以实现相应的目的。
这里的逻辑,即是重新定义一个BlockingDeque,重载相应的offer方法,相应的参考如下:
public boolean offer(Runnable o) { //这里的parent为ThreadPoolExecutor的引用 int poolSize = parent.getPoolSize(); int maxPoolSize = parent.getMaximumPoolSize(); //还没到最大值,先创建线程 if(poolSize < maxPoolSize) { return false; } //默认逻辑 return super.offer(o); }
在ThreadPoolExecutor类中有两个比较重要的方法引起了我们的注意:beforeExecute和afterExecute
这两个方法是protected修饰的,很显然是留给开发人员去重写方法体实现自己的业务逻辑,非常适合做钩子函数,在任务run方法的前后增加业务逻辑,比如添加日志、统计等。这个和我们springmvc中拦截器的preHandle和afterCompletion方法很类似,都是对方法进行环绕,类似于spring的AOP
spring boot线程池的正确使用方式_代码最诚实的朋友的博客-CSDN博客_springboot中使用线程池
- /**
- * @ClassName ThreadPoolConfig
- * @Description 配置类中构建线程池实例,方便调用
- */
- @Configuration
- public class ThreadPoolConfig {
- @Bean(value = "threadPoolInstance")
- public ExecutorService createThreadPoolInstance() {
- //通过guava类库的ThreadFactoryBuilder来实现线程工厂类并设置线程名称
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build();
- ExecutorService threadPool = new ThreadPoolExecutor(10, 16, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());
- return threadPool;
- }
- }
- //通过name=threadPoolInstance引用线程池实例
- @Resource(name = "threadPoolInstance")
- private ExecutorService executorService;
-
- @Override
- public void spikeConsumer() {
- //TODO
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- //TODO
- //执行业务逻辑
- }});
- }
1.线程池里执行的是任务,核心逻辑在ThreadPoolExecutor类的execute方法中,同时ThreadPoolExecutor中维护了HashSet<Worker> workers;
2.addWorker()方法来创建线程执行任务,如果是核心线程的任务,会赋值给Worker的firstTask属性;
3.Worker实现了Runnable,本质上也是任务,核心在run()方法里;
4.run()方法的执行核心runWorker(),自旋拿任务while (task != null || (task = getTask()) != null)),task是核心线程Worker的firstTask或者getTask();
5.getTask()的核心逻辑:
1.若当前工作线程数量大于核心线程数->说明此线程是非核心工作线程,通过poll()拿任务,未拿到任务即getTask()返回null,然后会在processWorkerExit(w, completedAbruptly)方法释放掉这个非核心工作线程的引用;
2.若当前工作线程数量小于核心线程数->说明此时线程是核心工作线程,通过take()拿任务
3.take()方式取任务,如果队列中没有任务了会调用await()阻塞当前线程,直到新任务到来,所以核心工作线程不会被回收; 当执行execute方法里的workQueue.offer(command)时会调用Condition.singal()方法唤醒一个之前阻塞的线程,这样核心线程即可复用