• 【Java技术专题】「并发编程系列」深入分析线程池的工作原理(上篇)


    线程池继承关系

    ThreadPoolExecutor实现的顶层接口是Executor,在接口Executor中用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中,由Executor框架完成线程的调配和任务的执行部分。

    ExecutorService接口增加了一些能力

    • 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法
    • 提供了管控线程池的方法,比如停止线程池的运行

    AbstractExecutorService则是上层的抽象类

    将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可

    ThreadPoolExecutor实现最复杂的运行部分:

    可以自动创建、管理和复用指定数量的一组线程,适用方只需提交任务即可线程安全,ThreadPoolExecutor内部有状态、核心线程数、非核心线程等属性,广泛使用了CAS和AQS锁机制避免并发带来的冲突问题

    提供了核心线程、缓冲阻塞队列、非核心线程、抛弃策略的概念,可以根据实际应用场景进行组合使用

    提供了beforeExecute 和afterExecute()可以支持对线程池的功能进行扩展

    线程池的优点

    降低线程创建和销毁线程造成的开销

    • 提高响应速度:任务到达时,相对于手工创建一个线程,直接从线程池中拿线程,速度肯定快很多
    • 提高线程可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统稳定性,使用线程池可以进行同意分配、调优和监控。

    构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    • corePoolSize:线程池的核心线程数,一般情况下不管有没有任务都会一直在线程池中一直存活,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)设置为true时,闲置的核心线程会存在超时机制,如果在指定时间没有新任务来时,核心线程也会被终止,而这个时间间隔由第3个属性keepAliveTime指定。
    • maximumPoolSize:线程池所能容纳的最大线程数,当活动的线程数达到这个值后,后续的新任务将会被阻塞。
    • keepAliveTime:控制线程闲置时的超时时长,超过则终止该线程。一般情况下用于非核心线程,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)设置为true时,也作用于核心线程。
    • unit:用于指定keepAliveTime参数的时间单位,TimeUnit是个enum枚举类型,常用的有:TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。
    • workQueue:线程池的任务队列,通过线程池的execute(Runnable command)方法会将任务Runnable存储在队列中。
    • threadFactory:线程工厂,它是一个接口,用来为线程池创建新线程的。
    • handler:拒绝策略,所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。

    成员变量

    /**
     * 任务阻塞队列 
     */
    private final BlockingQueue<Runnable> workQueue; 
    /**
     * 非公平的互斥锁(可重入锁)
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    /**
     * 线程集合一个Worker对应一个线程,没有核心线程的说话,只有核心线程数
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    /**
     * 配合mainLock通过Condition能够更加精细的控制多线程的休眠与唤醒
     */
    private final Condition termination = mainLock.newCondition();
    /**
     * 线程池中线程数量曾经达到过的最大值。
     */
    private int largestPoolSize;  
    /**
     * 已完成任务数量
     */
    private long completedTaskCount;
    /**
     * ThreadFactory对象,用于创建线程。
     */
    private volatile ThreadFactory threadFactory;  
    /**
     * 拒绝策略的处理句柄
     * 现在默认提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 线程池维护线程(超过核心线程数)所允许的空闲时间
     */
    private volatile long keepAliveTime;
    /**
     * 允许线程池中的核心线程超时进行销毁
     */
    private volatile boolean allowCoreThreadTimeOut;  
    /**
     * 线程池维护线程的最小数量,哪怕是空闲的  
     */
    private volatile int corePoolSize;
    /**
     * 线程池维护的最大线程数量,线程数超过这个数量之后新提交的任务就需要进入阻塞队列
     */
    private volatile int maximumPoolSize;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    创建线程池

    缓存程线程池(不会存放队列,一直创建线程)

    核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程
    
    • 1

    newCachedThreadPool是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。

    会出下面大量的线程对象,导致的OOM

    public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                      60L, TimeUnit.SECONDS,
                      new SynchronousQueue<Runnable>());
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    执行流程

    • 先执行SynchronousQueue的offer方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueue的poll方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程,否则,配对失败,创建新的线程去处理任务
    • 当线程池中的线程空闲时,会执行SynchronousQueue的poll方法等待执行SynchronousQueue中新提交的任务。若等待超过60s,空闲线程就会终止


    使用场景

    执行大量短生命周期任务因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务


    单线程线程池(只会运行一个线程,否则一直会堆积到阻塞队列)

    它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景,SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,使用无界队列LinkedBlockingQueue作为线程池的工作队列

    newSingleThreadExecutor 创建是一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它此线程池保证所有任务的执行顺序按照任务的提交顺序执行

    • 当线程池中没有线程时会创建一个新线程来执行任务
    • 当前线程池中有一个线程后将新任务加入LinkedBlockingQueue
    • 线程执行完第一个任务后会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行 。

    使用场景:

    **适用于串行执行任务场景**
    
    • 1

    会存在出现阻塞队列堆积过大,导致的OOM

    public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    固定大小线程池(会运行指定数量的线程,否则一直会堆积到阻塞队列)

     corePoolSize等于maximumPoolSize,所以线程池中只有核心线程,使用无界阻塞队列LinkedBlockingQueue作为工作队列
    
    • 1

    使用场景

    适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务


    newFixedThreadPool创建固定大小的线程池每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。

    会存在出现阻塞队列堆积大,导致的OOM

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                      0L, TimeUnit.MILLISECONDS,
                      new LinkedBlockingQueue<Runnable>(),
                      threadFactory);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    定时任务线程池(任务队列与最大值均为无限大小,一直堆积到阻塞队列)

    newScheduledThreadPool 创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。

    线程总数阈值为Integer.MAX_VALUE,工作队列使用DelayedWorkQueue,非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。

    会存在出现阻塞队列堆积过大,导致的OOM

    public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
      return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                       ThreadFactory threadFactory) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    可以看出来上面的方法一共使用了DelayedWorkQueueLinkedBlockingQueueSynchronousQueue。这个就是线程核心之一的阻塞队列。

    两种方式提交任务:

    • scheduleAtFixedRate: 按照固定速率周期执行
    • scheduleWithFixedDelay:上个任务延迟固定时间后执行

    任务阻塞队列

    它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;

    SynchronousQueue

    直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。

    一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态


    SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;


    ArrayBlockingQueue

    有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示:
    
    • 1
    new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    
    • 1

    使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。


    在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

    LinkedBlockingQueue

    无界的任务队列:无界任务队列可以使用LinkedBlockingQueue实现,如下所示:
    
    • 1
     new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    
    • 1

    使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。


    PriorityBlockingQueue

    优先任务队列:优先任务队列通过PriorityBlockingQueue实现,使用平衡二叉树堆,实现的具有优先级的无界阻塞队列
    
    • 1

    任务会按优先级重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

    PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行

    其实LinkedBlockingQueue也是可以设置界限的,它默认的界限是Integer.MAX_VALUE。同时也支持也支持构造的时候设置队列大小

    DelayQueue

    无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。

    拒绝策略

    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    
    • 1
    • 2
    • 3

    当Executor已经关闭,即执行了executorService.shutdown()方法后,或者Executor将有限边界用于最大线程和工作队列容量,且已经饱和时。使用方法execute()提交的新任务将被拒绝. 在以上述情况下,execute方法将调用其RejectedExecutionHandler的rejectExecution()方法

    RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
    
    • 1

    AbortPolicy(默认的拒绝策略)

    也称为终止策略,遭到拒绝将抛出运行时RejectedExecutionException。业务方能通过捕获异常及时得到对本次任务提交的结果反馈。

    public static class AbortPolicy implements RejectedExecutionHandler {
      public AbortPolicy() { }
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    CallerRunsPolicy

    拥有自主反馈控制,让提交者执行提交任务,能够减缓新任务的提交速度。这种情况是需要让所有的任务都执行完毕。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    DiscardPolicy

    拒绝任务的处理程序,静默丢弃任务。使用此策略,我们可能无法感知系统的异常状态。慎用~!

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    DiscardOldestPolicy

    丢弃队列中最前面的任务,然后重新提交被拒绝的任务。是否要使用此策略需要看业务是否需要新老的替换,慎用~!(LRU)

    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运作执行流程

    1. 判断线程池中核心线程数是否已达阈值corePoolSize,若否,则创建一个新核心线程执行任务
    2. 若核心线程数已达阈值corePoolSize,判断阻塞队列workQueue是否已满,若未满,则将新任务添加进阻塞队列
    3. 若满,再判断,线程池中线程数是否达到阈值maximumPoolSize,若否,则新建一个非核心线程执行任务。若达到阈值,则执行线程池饱和策略。

    线程池饱和策略分为一下几种:

    • AbortPolicy:直接抛出一个异常,默认策略
    • DiscardPolicy: 直接丢弃任务
    • DiscardOldestPolicy:抛弃下一个将要被执行的任务(最旧任务)
    • CallerRunsPolicy:主线程中执行任务

    合理配置线程池大小

     要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
    
    • 1

    • 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
    • 任务的优先级:高,中和低。
    • 任务的执行时间:长,中和短。
    • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

    根据任务所需要的cpu和io资源的量可以分为,
    
    • 1
    • CPU密集型任务:  主要是执行计算任务,响应时间很快,cpu一直在运行,这种任务cpu的利用率很高。
    • IO密集型任务:主要是进行IO操作,执行IO操作的时间较长,这是cpu出于空闲状态,导致cpu的利用率不高。

    为了合理最大限度的使用系统资源同时也要保证的程序的高性能,可以给CPU密集型任务和IO密集型任务配置一些线程数。
    
    • 1
    • CPU密集型:线程个数为CPU核数。这几个线程可以并行执行,不存在线程切换到开销,提高了cpu的利用率的同时也减少了切换线程导致的性能损耗
    • IO密集型:线程个数为CPU核数的两倍。到其中的线程在IO操作的时候,其他线程可以继续用cpu,提高了cpu的利用率。
  • 相关阅读:
    AI 音辨世界:艺术小白的我,靠这个AI模型,速识音乐流派选择音乐 ⛵
    这个神器,让我的 Python 代码运行速度快了100倍
    CleanMyMac X免费电脑清理加速软件-清理内存磁盘缓存注册表
    Java语法基础案例
    selenium4.0以上元素被定位
    MySQL 5.7限制general_log日志大小
    Spring Bean 的作用域(Bean Scope)
    物联网应用技术专业是属于什么类
    C/C++-内存
    提升团队能力的真正利器不是培训而是复盘,
  • 原文地址:https://blog.csdn.net/l569590478/article/details/127817208