• Java并发编程学习三:线程池


    一、线程池简介

    1. 为什么要用线程池

    未使用线程池时,需要手动创建线程。当需要执行的任务很多时,需要创建很多的线程去处理。如果采用这种方式,会有以下问题:

    • 反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的资源比线程执行任务本身消耗的资源还要大。
    • 过多的线程会占用过多的内存等资源,还会带来过多的上下文切换,同时还会导致系统不稳定

    那任务很多的时候该怎么办呢?不创建这么多线程,都在主线程串行执行,那效率也太低了。这时候,就需要使用线程池了。

    线程池解决上述问题的思路是:

    • 针对反复创建线程开销大的问题,线程池用一些固定的线程一直保持工作状态并反复执行任务
    • 针对过多线程占用太多内存资源的问题,解决思路更直接,线程池会根据需要创建线程,控制线程的总数量,避免占用过多内存资源

    2. 如何使用线程池

    /** 
    * 描述: 用固定线程数的线程池执行10000个任务 
    */ 
    public class ThreadPoolDemo { 
     
        public static void main(String[] args) { 
            ExecutorService service = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 10000; i++) { 
                service.execute(new Task());
            } 
        System.out.println(Thread.currentThread().getName());
        } 
     
        static class Task implements Runnable { 
     
            public void run() { 
                System.out.println("Thread Name: " + Thread.currentThread().getName());
            } 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    如上所示,创建一个线程池,线程池中有 5 个线程,然后线程池将 10000 个任务分配给这 5 个线程,这 5 个线程反复领取任务并执行,直到所有任务执行完毕,这就是线程池的思想

    3. 使用线程池的好处

    相对于手动创建线程,使用线程池的好处有:

    • 线程池中的线程是可以复用的,只用少量的线程就可以去执行大量的任务,这就大大减小了线程生命周期的开销。
    • 线程池会根据配置和任务数量灵活地控制线程数量,不够的时候就创建,太多的时候就回收,避免线程过多导致内存溢出,或线程太少导致 CPU 资源浪费
    • 线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务

    二、线程池参数

    线程池的核心参数有6个,如下图所示:

    1. corePoolSize 与 maximumPoolSize

    corePoolSize 指的是核心线程数,线程池初始化时线程数默认为 0,当有新的任务提交后,会创建新线程执行任务,如果不做特殊设置,此后线程数通常不会再小于 corePoolSize ,因为它们是核心线程,即便未来可能没有可执行的任务也不会被销毁。

    随着任务量的增加,在任务队列满了之后,线程池会进一步创建新线程,最多可以达到 maximumPoolSize 来应对任务多的场景,如果未来线程有空闲,大于 corePoolSize 的线程会被合理回收。所以正常情况下,线程池中的线程数量会处在 corePoolSize 与 maximumPoolSize 的闭区间内。

    2. keepAliveTime+时间单位

    当线程池中线程数量多于核心线程数时,而此时又没有任务可做,线程池就会检测线程的 keepAliveTime,如果超过规定的时间,无事可做的线程就会被销毁,以便减少内存的占用和资源消耗。如果后期任务又多了起来,线程池也会根据规则重新创建线程

    3. ThreadFactory

    ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。

    4. workQueue 和 Handler

    分别是线程池的阻塞队列和任务拒绝策略,这里不做具体介绍,后面会进行细致介绍

    5. 线程池的处理流程

    • 当提交任务后,线程池首先会检查当前线程数,如果此时线程数小于核心线程数,比如最开始线程数量为 0,则新建线程并执行任务
    • 随着任务的不断增加,线程数会逐渐增加并达到核心线程数,此时如果仍有任务被不断提交,就会被放入 workQueue 任务队列中,等待核心线程执行完当前任务后重新从 workQueue 中提取正在等待被执行的任务
    • 假设任务特别的多,已经达到了 workQueue 的容量上限,这时线程池就会启动后备力量,也就是 maximumPoolSize 最大线程数,线程池会在 corePoolSize 核心线程数的基础上继续创建线程来执行任务。假设任务被不断提交,线程池会持续创建线程直到线程数达到 maximumPoolSize 最大线程数
    • 如果依然有任务被提交,这就超过了线程池的最大处理能力,这个时候线程池就会拒绝这些任务

    三、线程池的类型

    常见的线程池有六种,分别是:FixedThreadPool、CachedThreadPool、ScheduledThreadPool、ScheduledThreadPool、SingleThreadScheduledExecutor和ForkJoinPool。

    1. FixedThreadPool

    这种线程池的核心线程数和最大线程数一样,可以看做是固定线程数的线程池。

    它的特点是线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的,就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。而且就算任务队列满了,到了本该继续增加线程数的时候,由于它的最大线程数和核心线程数是一样的,所以也无法再增加新的线程了。

    2. CachedThreadPool

    这种线程池称为可缓存线程池,它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。

    这种线程池的线程数量不是固定不变的,也有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

    ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) { 
            service.execute(new Task() { 
        });
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    for 循环提交 1000 个任务给 CachedThreadPool,假如执行任务的耗时很久,而 for 循环提交任务的操作是非常快的,就可能导致 1000 个任务都提交完了但第一个任务还没有被执行完,所以此时 CachedThreadPool 就可以动态的伸缩线程数量,随着任务的提交,不停地创建 1000 个线程来执行任务,而当任务执行完之后,假设没有新的任务了,那么大量的闲置线程又会造成内存资源的浪费,这时线程池就会检测线程在 60 秒内有没有可执行任务,如果没有就会被销毁,最终线程数量会减为 0。

    3. ScheduledThreadPool

    这种线程池支持定时或者周期性执行人物,如:每隔10秒执行一次任务。主要的用法有以下三种:

    ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
     
    service.schedule(new Task(), 10, TimeUnit.SECONDS);
     
    service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
     
    service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    上述三种用法的区别是:

    • 第一种方法 schedule 比较简单,表示延迟指定时间后执行一次任务,如果代码中设置参数为 10 秒,也就是 10 秒后执行一次任务后就结束
    • 第二种方法 scheduleAtFixedRate 表示以固定的频率执行任务,它的第二个参数 initialDelay 表示第一次延时时间,第三个参数 period 表示周期,也就是第一次延时后每次延时多长时间执行一次任务。
    • 第三种方法 scheduleWithFixedDelay 与第二种方法类似,也是周期执行任务,区别在于对周期的定义,之前的 scheduleAtFixedRate 是以任务开始的时间为时间起点开始计时,时间到就开始执行第二次任务,而不管任务需要花多久执行;而 scheduleWithFixedDelay 方法以任务结束的时间为下一次循环的时间起点开始计时。
    • 例如,假如period为1h,对于第二种方法,任务执行的时间表为:00:00开始,00:10完成,01:00开始,01:10完成…,对于第二种方法,任务开始执行的时间表为:00:00开始,00:10完成,01:10开始,01:20完成…

    4. SingleThreadExecutor

    这种线程池只有一个线程,它会使用唯一的线程去执行任务,原理和 FixedThreadPool 是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。

    这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景,而前几种线程池不一定能够保障任务的执行顺序等于被提交的顺序,因为它们是多线程并行执行的。

    5. SingleThreadScheduledExecutor

    这种线程和第三种 ScheduledThreadPool 线程池非常相似,它只是 ScheduledThreadPool 的一个特例,内部只有一个线程。

    以核心线程数、最大线程数,以及线程存活时间三个维度对上述五种线程池进行对比:

    6. ForkJoinPool

    这种线程池是在JDK 7中引入的,它的主要用法和之前的线程池是相同的,也是把任务交给线程池去执行,线程池中也有任务队列来存放任务。这种线程池默认线程数等于系统上的 CPU 数,也可以自己指定。

    这种线程池和之前的线程池有两大不同之处:子任务和内部结构

    a. 子任务
    对于一个Task,这个 Task 可以产生三个子任务,三个子任务并行执行完毕后将结果汇总给 Result。例如:主任务需要执行非常繁重的计算任务,就可以把计算拆分成三个部分,这三个部分是互不影响相互独立的,这样就可以利用 CPU 的多核优势,并行计算,然后将结果进行汇总。这里涉及到两个步骤,一个是拆分子任务(Fork),一个是汇总结果(Join),这也是线程池名称的由来

    以下是ForkJoin 的子任务模式,如图所示,子任务同样会产生子子任务,最后再逐层汇总,得到最终的结果:

    利用ForkJoinPool可以很方便的解决Fibonacci数问题,具体如下:

    class Fibonacci extends RecursiveTask<Integer> { 
     
        int n;
     
        public Fibonacci(int n) { 
            this.n = n;
        } 
     
        @Override
        public Integer compute() { 
            if (n <= 1) { 
                return n;
            } 
    	    Fibonacci f1 = new Fibonacci(n - 1);
    	    f1.fork();
    	    Fibonacci f2 = new Fibonacci(n - 2);
    	    f2.fork();
    	    return f1.join() + f2.join();
        } 
    }
    
    public static void main(String[] args) throws ExecutionException, InterruptedException { 
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        for (int i = 0; i < 10; i++) { 
            ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));
            System.out.println(task.get());
        } 
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Fibonacci继承了 RecursiveTask,RecursiveTask 类是ForkJoinTask 的一个子类,对ForkJoinTask 进行了简单地包装,这时我们重写 compute() 方法,当 n<=1 时直接返回,当 n>1 就创建递归任务,也就是 f1 和 f2,然后我们用 fork() 方法分裂任务并分别执行,最后在 return 的时候,使用 join() 方法把结果汇总,这样就实现了任务的分裂和汇总。

    b. 内部结构
    ForkJoinPool 线程池内部除了有一个共用的任务队列之外,每个线程还有一个对应的双端队列 deque,这时一旦线程中的任务被 Fork 分裂了,分裂出来的子任务放入线程自己的 deque 里,而不是放入公共的任务队列中。如果此时有三个子任务放入线程 t1 的 deque 队列中,对于线程 t1 而言获取任务的成本就降低了,可以直接在自己的任务队列中获取而不必去公共队列中争抢也不会发生阻塞(除了后面会讲到的 steal 情况外),减少了线程间的竞争和切换,是非常高效的。

    这里再考虑一种情况,线程有多个,而线程 t1 的任务特别繁重,分裂了数十个子任务,但是 t0 此时却无事可做,它自己的 deque 队列为空,这时为了提高效率,t0 就会想办法帮助 t1 执行任务,这就是“work-stealing”的含义。

    双端队列 deque 中,线程 t1 获取任务的逻辑是后进先出,也就是LIFO(Last In Frist Out),而线程 t0 在“steal”偷线程 t1 的 deque 中的任务的逻辑是先进先出,也就是FIFO(Fast In Frist Out),如图所示,图中很好的描述了两个线程使用双端队列分别获取任务的情景。使用 “work-stealing” 算法和双端队列很好地平衡了各线程的负载。

    以下用一张图总结ForkJoinPool 的整体内部结构:

    四、线程池的阻塞队列

    对于上述介绍的几种主要的线程池,实际使用的阻塞队列只有三种:LinkedBlockingQueue、SynchronousQueue和DelayedWorkQueue,以下是各个线程池和阻塞队列的对应关系:

    1. LinkedBlockingQueue

    FixedThreadPool 和 SingleThreadExector 使用了LinkedBlockingQueue,这种阻塞队列容量为 Integer.MAX_VALUE ,可以认为是无界队列。

    2. SynchronousQueue

    CachedThreadPool使用了SynchronousQueue,这种阻塞队列的容量为0,不存储任何元素。

    由于CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的。因此CachedThreadPool线程池不需要一个任务队列来存储任务,因为一旦有任务被提交就直接转发给线程或者创建新线程来执行,而不需要另外保存它们。

    3. DelayedWorkQueue

    ScheduledThreadPool 和 SingleThreadScheduledExecutor使用了DelayedWorkQueue,这种线程池的最大特点是可以延迟执行任务,如:一定时间后执行任务或者每隔一定时间执行一次任务。同时,DelayedWorkQueue也是一个无界队列。

    DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。

    五、线程池的拒绝策略

    线程池在遇到以下两种情况时会拒绝提交的任务:

    • 调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部依然有没执行完的任务正在执行,但是由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝。
    • 线程池没有能力继续处理新提交的任务(具体原因参见前面提到的线程池处理流程),也就是工作已经非常饱和的时候

    Java 在 ThreadPoolExecutor 类中提供了 4 种默认的拒绝策略来应对不同的场景,都实现了 RejectedExecutionHandler 接口,如图所示,分别是:AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy

    1. AbortPolicy

    在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让线程池使用方感知到任务被拒绝,此时可以根据业务逻辑选择重试或者放弃提交等策略。

    2. DiscardPolicy

    当新任务被提交后直接被丢弃掉,也不会给出任何的通知,相对而言存在一定的风险,因为提交任务的时候根本不知道这个任务会被丢弃,可能造成数据丢失。

    3. DiscardOldestPolicy

    如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,也存在一定的数据丢失风险。

    4. CallerRunsPolicy

    当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务

    这样的好处是:

    • 新提交的任务不会被丢弃,这样也就不会造成业务损失
    • 谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期

    六、正确创建线程池

    线程池的创建有两种方法:一种是通过Executors工具类自动创建,一种是手动创建的。但是,在《Java开发手册》中规定,线程池不允许使用Executors创建,而是应该通过ThreadPoolExecutor方式(即手动创建)。

    那么为什么不允许使用Executors创建?Executors的各种方法可以生成前面提到的常见的线程池,下面分析一下,这些方法会有什么问题。

    1. newFixedThreadPool()方法(×)

    Executors可以通过newFixedThreadPool()方法生成FixedThreadPool线程池,其内部实际还是调用了ThreadPoolExecutor 构造函数:

    public static ExecutorService newFixedThreadPool(int nThreads) { 
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3

    主要的问题在于,它生成的FixedThreadPool线程池默认采用容量没有上限的 LinkedBlockingQueue队列。而对于FixedThreadPool,它线程数量固定的线程池,当任务的处理速度比较慢,那么随着请求的增多,队列中堆积的任务也会越来越多,最终大量堆积的任务会占用大量内存,并发生 OOM。

    2. newSingleThreadExecutor()方法(×)

    public static ExecutorService newSingleThreadExecutor() { 
        return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }
    
    • 1
    • 2
    • 3

    newSingleThreadExecutor 和 newFixedThreadPool 的原理是一样的,只不过把核心线程数和最大线程数都直接设置成了 1,但是任务队列仍是无界的 LinkedBlockingQueue,所以也会导致同样的问题,也就是当任务堆积时,可能会占用大量的内存并导致 OOM。

    3. newCachedThreadPool()方法(×)

    public static ExecutorService newCachedThreadPool() { 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3

    该方法生成的CachedThreadPool,默认采用的任务队列是不存储任何任务的SynchronousQueue队列,这本身到没有什么问题。最大的问题是,最大的线程数是 Integer.MAX_VALUE,基本上等于是不限制线程数量。这样的话,当任务数量特别多的时候,就可能会导致创建非常多的线程,最终超过了操作系统的上限而无法创建新线程,或者导致内存不足。

    4. newScheduledThreadPool()方法(×)

    ScheduledThreadPool 和SingleThreadScheduledExecutor 原理一样,可以通过newScheduledThreadPool()方法创建:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) { 
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里,ScheduledThreadPoolExecutor是ThreadPoolExecutor,实际上最终还是调用了ThreadPoolExecutor的构造函数。

    该方法创建ScheduledThreadPool 和SingleThreadScheduledExecutor,默认采用的是DelayedWorkQueue队列,该队列是一个延迟队列,但同时也是一个无界队列,因此当存放过多的任务时,也会发生OOM。

    5. 正确方法:ThreadPoolExecutor构造函数

    经过以上分析,Executors创建线程池会出现多种多样的问题,正确的方法应该是使用ThreadPoolExecutor构造函数(其实还可以使用一些开源类库,比如apache和guava等),自己手动设定线程池参数创建线程池。从上面的分析来看,其实Executors本质上还是调用ThreadPoolExecutor构造函数。

    如下所示,为使用ThreadPoolExecutor构造函数自定义构建线程池的示例:

    private static ExecutorService executor = 
                new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
    • 1
    • 2

    a. 设定线程数
    自定义线程池时,设定合适的线程数可以充分并合理地使用 CPU 和内存等资源,从而最大限度地提高程序的性能。

    对应不同的任务有不同的线程数量设定策略

    对于CPU密集型任务,如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务,最佳的线程数为 CPU 核心数的 1~2 倍。如果设置过多的线程数,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,此时每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,反而会让性能下降。

    对于耗时IO型任务,如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在任务队列中等待的任务就会减少,可以更好地利用资源。

    Java并发编程实战》的作者 Brain Goetz 推荐了一种计算合理线程数的方法:
    线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)
    如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,线程数就随之减少。

    b. 设定阻塞队列
    除了之前提到的LinkedBlockingQueue 、 SynchronousQueue 和 DelayedWorkQueue,还有一种常用的阻塞队列叫 ArrayBlockingQueue。

    ArrayBlockingQueue是用数组实现的,在新建对象的时候要求传入容量值,且后期不能扩容,所以 ArrayBlockingQueue 的最大的特点就是容量是有限的。这样一来,如果任务队列放满了任务,而且线程数也已经达到了最大值,线程池根据规则就会拒绝新提交的任务,这样一来就可能会产生一定的数据丢失。

    虽然有丢失数据的风险,但是相比于无限增加任务或者线程数导致内存不足,进而导致程序崩溃,这样的风险还是可以接受的。另外,如果设置的线程数合理,ArrayBlockingQueue还是可以尽量避免数据丢失的。

    c. 设定线程工厂
    一般而言,使用默认的defaultThreadFactory即可,但是也可以传入自定义的有额外能力的线程工厂。

    在实际使用时,可能有多个线程池,而不同的线程池之间有必要通过不同的名字来进行区分,所以可以传入能根据业务信息进行命名的线程工厂,以便后续可以根据线程名区分不同的业务进而快速定位问题代码。

    可以使用com.google.common.util.concurrent.ThreadFactoryBuilder来实现自定义线程工厂

    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
    ThreadFactory rpcFactory = builder.setNameFormat("rpc-pool-%d").build();
    
    • 1
    • 2

    d. 设定拒绝策略
    除了使用之前提到的四种拒绝策略:AbortPolicy,DiscardPolicy,DiscardOldestPolicy 或者 CallerRunsPolicy。还可以通过实现RejectedExecutionHandler 接口来实现自己的拒绝策略,在接口中我们需要实现 rejectedExecution 方法,在 rejectedExecution 方法中,执行例如打印日志、暂存任务、重新执行等自定义的拒绝策略,以便满足业务需求。

    private static class CustomRejectionHandler implements RejectedExecutionHandler { 
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
            //打印日志、暂存任务、重新执行等拒绝策略
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    七、线程池的关闭

    当线程池使用结束后,需要及时关闭线程池。在 ThreadPoolExecutor 中涉及关闭线程池的方法有以下几种:

    • void shutdown;
    • boolean isShutdown;
    • boolean isTerminated;
    • boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    • List shutdownNow;

    1. shutdown()

    该方法可以安全地关闭一个线程池,调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表 shutdown() 操作是没有任何效果的,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

    2. isShutdown()

    该方法可以返回 true 或者 false 来判断线程池是否已经开始了关闭工作,也就是是否执行了 shutdown 或者 shutdownNow 方法。

    需要注意的是,如果调用 isShutdown() 方法的返回的结果为 true 并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程,也就是说,此时可能线程池中依然有线程在执行任务,队列里也可能有等待被执行的任务。

    3. awaitTermination()

    该方法本身并不是用来关闭线程池的,而是主要用来判断线程池状态的。调用 awaitTermination 方法后当前线程会尝试等待一段指定的时间,如果在等待时间内,线程池已关闭并且内部的任务都执行完毕了,也就是说线程池真正“终结”了,那么方法就返回 true,否则超时返回 fasle。

    例如,给 awaitTermination 方法传入的参数是 10 秒,那么它就会陷入 10 秒钟的等待,直到发生以下三种情况之一:

    • 等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true;
    • 等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false;
    • 等待期间线程被中断,方法会抛出 InterruptedException 异常。

    4. shutdownNow()

    该方法与shutdown 方法不同之处在于名字中多了一个单词 Now,也就是表示立刻关闭的意思。在执行 shutdownNow 方法之后,首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,根据返回的任务 List 可以进行一些补救的操作,例如记录在案并在后期重试。

    shutdownNow()的源码如下:

    public List<Runnable> shutdownNow() { 
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
    
        try { 
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally { 
            mainLock.unlock();
        } 
     
        tryTerminate();
        return tasks;
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    源码中,interruptWorkers()方法会让每一个已经启动的线程都中断,这样线程就可以在执行任务期间检测到中断信号并进行相应的处理,提前结束任务。但是,即使调用了 shutdownNow 方法,如果被中断的线程对于中断信号不理不睬,那么依然有可能导致任务不会停止。具体应该怎么正确停止线程,可以参考:线程基础

    八、线程池的"线程复用"原理

    再介绍为什么使用线程池的时候,提到过,线程池很大的一个优点在于能够复用线程。无论是固定数量或可变数量的线程,其线程数量都远远小于任务数量,面对这种情况线程池可以通过线程复用让同一个线程去执行不同的任务。

    具体线程池时怎么实现线程复用的呢?结合源码进行分析:

    public void execute(Runnable command) { 
        if (command == null) 
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        } 
        if (isRunning(c) && workQueue.offer(command)) { 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
            else if (workerCountOf(recheck) == 0) 
                addWorker(null, false);
        } 
        else if (!addWorker(command, false)) 
            reject(command);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    execute()方法对应了线程池的整个处理流程。

    首先是通过 if 语句判断 command ,也就是 Runnable 任务是否等于 null,如果为 null 就抛出异常。

    //如果传入的Runnable的空,就抛出异常
    if (command == null) 
        throw new NullPointerException();
    
    • 1
    • 2
    • 3

    接下来判断当前线程数是否小于核心线程数,如果小于核心线程数就调用 addWorker() 方法增加一个 Worker,这里的 Worker 就可以理解为一个线程

    if (workerCountOf(c) < corePoolSize) { 
        if (addWorker(command, true)) 
            return;
            c = ctl.get();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    addWorker 方法的主要作用是在线程池中创建一个线程并执行第一个参数传入的任务,它的第二个参数是个布尔值,如果布尔值传入 true 代表增加线程时判断当前线程是否少于 corePoolSize,小于则增加新线程,大于等于则不增加;同理,如果传入 false 代表增加线程时判断当前线程是否少于 maxPoolSize,小于则增加新线程,大于等于则不增加,所以这里的布尔值的含义是以核心线程数为界限还是以最大线程数为界限进行是否新增线程的判断。addWorker() 方法如果返回 true 代表添加成功,如果返回 false 代表添加失败。

    如果当前线程数大于或等于核心线程数或者 addWorker 失败了,就通过 if (isRunning© && workQueue.offer(command)) 检查线程池状态是否为 Running,如果线程池状态是 Running 就把任务放入任务队列中,也就是 workQueue.offer(command)。

    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        // 省略...
    }
    
    • 1
    • 2
    • 3
    • 4

    再次进行线程池的运行状态检查recheck,如果线程池已经不处于 Running 状态,说明线程池被关闭,那么就移除刚刚添加到任务队列中的任务,并执行拒绝策略。

    if (! isRunning(recheck) && remove(command)) 
        reject(command);
    
    • 1
    • 2

    如果recheck线程池状态为 Running,那么当任务被添加进来之后就需要防止没有可执行线程的情况发生(比如之前的线程被回收了或意外终止了),所以此时如果检查当前线程数为 0,也就是 workerCountOf(recheck) == 0,那就执行 addWorker() 方法新建线程

    else if (workerCountOf(recheck) == 0) 
        addWorker(null, false);
    
    • 1
    • 2

    如果线程池不是 Running 状态或线程数大于或等于核心线程数并且任务队列已经满了,根据规则,此时需要添加新线程,直到线程数达到“最大线程数”,所以此时就会再次调用 addWorker 方法并将第二个参数传入 false,传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize,小于则增加新线程,大于等于则不增加,也就是以 maxPoolSize 为上限创建新的 worker;addWorker 方法如果返回 true 代表添加成功,如果返回 false 代表任务添加失败,说明当前线程数已经达到 maxPoolSize,然后执行拒绝策略 reject 方法。如果执行到这里线程池的状态不是 Running,那么 addWorker 会失败并返回 false,所以也会执行拒绝策略 reject 方法。

    else if (!addWorker(command, false)) 
        reject(command);
    
    • 1
    • 2

    execute()方法中多次用到addWorker ()方法传入任务,该方法会添加并启动一个 Worker。这里的 Worker 可以理解为是对 Thread 的包装,Worker 内部有一个 Thread 对象,它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。

    线程复用的逻辑在Worker 类中的 run 方法里执行的 runWorker 方法中,简化以后的代码如下:

    runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } finally {
                task = null;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到,实现线程复用的逻辑主要在一个不停循环的 while 循环体中:

    • 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务
    • 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)

    总结线程复用的原理:在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run 方法,把 run 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加

  • 相关阅读:
    SSG、SSR、CSR的区别
    VMware workstation 16 安装与配置
    我们为什么要做一名系统管理员?
    开题报告《基于JavaEE的数据标注项目管理系统的设计与实现》
    [go学习笔记.第十章.面向对象编程] 9.面向对象的三大特性-继承
    C++面向对象三大特性之一------多态(下)
    【Rust】简介、安装和编译
    vulnhub靶场之PYLINGTON: 1
    Service Mesh技术详解
    [DP] ABC262 D - I Hate Non-integer Number
  • 原文地址:https://blog.csdn.net/weixin_41402069/article/details/125995052