• 线程池的原理


    目录

    线程池基本概念

    Executors提供的几种线程池

    一、newCachedThreadPool

    二、newFixedThreadPool

    三、newSingleThreadExecutor

    四、newScheduledThreadPool

    JAVA自定义线程池

    ThreadPoolExecutor的基本说明

     线程执行任务过程介绍

    拒绝策略

    AbortPolicy代码演示:

    DiscardPolicy代码演示:

    DiscardOldestPolicy代码演示:

    CallerRunsPolicy代码演示

    源码解析

    execut方法

    addworker方法

    runworker方法

    getTask方法

    processWorkerExit方法


    线程池基本概念

    线程池存在的意义

    系统创建一个线程的成本是比较高的,因为它涉及到与操作系统交互,当程序中需要创建大量生存期很短暂的线程时,频繁的创建和销毁线程对系统的资源消耗有可能大于业务处理是对系统资源的消耗,这样就有点"舍本逐末"了。针对这一种情况,为了提高性能,我们就可以采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就会启动一个线程来执行该任务。等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态。等待下一次任务的执行。

    线程池的设计思路

    1. 准备一个任务容器
    2. 一次性启动多个(2个)消费者线程
    3. 刚开始任务容器是空的,所以线程都在wait
    4. 直到一个外部线程向这个任务容器中扔了一个"任务",就会有一个消费者线程被唤醒
    5. 这个消费者线程取出"任务",并且执行这个任务,执行完毕后,继续等待下一次任务的到来

    Executors提供的几种线程池

    一、newCachedThreadPool

    构造方式:

     说明:

    corePoolSize核心线程为0,说明创建的线程都是救急线程,存活时间为60秒钟,使用的任务队列是SynchronousQueue,这是一个特特殊的队列,特殊之处在于他能容纳的任务数量为0,当任务输送过来时,就进入阻塞,直到被取走执行。

    二、newFixedThreadPool

    构造方式:

     说明:创建的都时核心线程,当任务过多是就会进入linkedBlockingqueque等待,这个队列大小是int最大值

    三、newSingleThreadExecutor

    说明:只创建了一个核心线程,然后大的任务阻塞队列,同时只能处理一件事情,而较大的阻塞对了可能对导致内存溢出 

    四、newScheduledThreadPool

    说明:(后续补)

    JAVA自定义线程池

    ThreadPoolExecutor的基本说明

    由java提供的默认线程池ThreadPoolExecutor的创建方法可以看出,都是基于ThreadPoolExecutor线程池的不同参数配置产出的不同作用的线程池,所以主要讲一下ThreadPoolExecutor线程池

    构造方法:

    参数说明:

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler)
    8. corePoolSize: 核心线程的最大值,不能小于0
    9. maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
    10. keepAliveTime: 空闲线程最大存活时间,不能小于0
    11. unit: 时间单位
    12. workQueue: 任务队列,不能为null
    13. threadFactory: 创建线程工厂,不能为null
    14. handler: 任务的拒绝策略,不能为null

     线程执行任务过程介绍

    线程池中的线程是惰性创建的,仅在需要的时候才开始创建,线程池中的线程分为核心线程和非核心线程(其实就是一个标记,用来标记线程池中的最小线程数),核心线程数加非核心线程数总和不能大于设置的最大线程数。当第一个任务进来的时候,会先创建一个核心线程,用核心线程来处理任务,当第二个任务进来的时候,不管第一个任务是否已经处理完,都会再创建一个核心线程,直到核心线程的数量达到了设定的核心线程数的时候,如果再来一个任务,就会丢进任务队列里面。已经创建成功的线程,除了处理直接分配的任务之外,还会不停的循环从任务队列中获取任务,然而当核心线程已经全部都正在处理任务,这时候任务又不断的提交入任务队列中,当任务队列达到阈值之后,就会创建出非核心线程来处理任务,如果处理速度还是跟不上任务提交的速度,也就是核心线程和非核心线程都在处理任务,而任务队列也已经满了,那么就会调用拒绝策略。与核心线程相同,非核心线程处理完直接分配的任务之后,也会不断的循环拿到任务队列中的任务,任务队列中拿任务是通过poll方法,携带着时间参数,这个时间参数也是就是设定的非核心线程的过期时间,当非核心线程从队列中拿任务,等待的时间到达了设定的过期时间之后仍然没拿到任务,那么会再下一次循环中退出。然后就把非核心线程给剔除。

    拒绝策略

    四种拒绝策略简介

    RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类。

    ThreadPoolExecutor.AbortPolicy:             丢弃任务并抛出RejectedExecutionException异常。是默认的策略。
    ThreadPoolExecutor.DiscardPolicy:          丢弃任务,但是不抛出异常 这是不推荐的做法。
    ThreadPoolExecutor.DiscardOldestPolicy:    抛弃队列中等待最久的任务 然后把当前任务加入队列中。
    ThreadPoolExecutor.CallerRunsPolicy:        调用任务的run()方法绕过线程池直接执行。
     

    AbortPolicy代码演示:

    1. public class ThreadPoolExecutorDemo01 {
    2. public static void main(String[] args) {
    3. /**
    4. * 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
    5. */
    6. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
    7. new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.AbortPolicy()) ;
    8. // 提交5个任务,而该线程池最多可以处理4个任务,当我们使用AbortPolicy这个任务处理策略的时候,就会抛出异常
    9. for(int x = 0 ; x < 5 ; x++) {
    10. threadPoolExecutor.submit(() -> {
    11. System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
    12. });
    13. }
    14. }
    15. }

    控制台输出结果

    pool-1-thread-1---->> 执行了任务
    pool-1-thread-3---->> 执行了任务
    pool-1-thread-2---->> 执行了任务
    pool-1-thread-3---->> 执行了任务
    控制台报错,仅仅执行了4个任务,有一个任务被丢弃了
     

    DiscardPolicy代码演示:

    1. public class ThreadPoolExecutorDemo02 {
    2. public static void main(String[] args) {
    3. /**
    4. * 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
    5. */
    6. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
    7. new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardPolicy()) ;
    8. // 提交5个任务,而该线程池最多可以处理4个任务,当我们使用DiscardPolicy这个任务处理策略的时候,控制台不会报错
    9. for(int x = 0 ; x < 5 ; x++) {
    10. threadPoolExecutor.submit(() -> {
    11. System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
    12. });
    13. }
    14. }
    15. }

    控制台输出结果

    pool-1-thread-1---->> 执行了任务
    pool-1-thread-1---->> 执行了任务
    pool-1-thread-3---->> 执行了任务
    pool-1-thread-2---->> 执行了任务
    控制台没有报错,仅仅执行了4个任务,有一个任务被丢弃了
     

    DiscardOldestPolicy代码演示:

    1. public class ThreadPoolExecutorDemo02 {
    2. public static void main(String[] args) {
    3. /**
    4. * 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
    5. */
    6. ThreadPoolExecutor threadPoolExecutor;
    7. threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
    8. new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardOldestPolicy());
    9. // 提交5个任务
    10. for(int x = 0 ; x < 5 ; x++) {
    11. // 定义一个变量,来指定指定当前执行的任务;这个变量需要被final修饰
    12. final int y = x ;
    13. threadPoolExecutor.submit(() -> {
    14. System.out.println(Thread.currentThread().getName() + "---->> 执行了任务" + y);
    15. });
    16. }
    17. }
    18. }

    控制台输出结果

    pool-1-thread-2---->> 执行了任务2
    pool-1-thread-1---->> 执行了任务0
    pool-1-thread-3---->> 执行了任务3
    pool-1-thread-1---->> 执行了任务4
    由于任务1在线程池中等待时间最长,因此任务1被丢弃。
     

    CallerRunsPolicy代码演示

    1. public class ThreadPoolExecutorDemo04 {
    2. public static void main(String[] args) {
    3. /**
    4. * 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
    5. */
    6. ThreadPoolExecutor threadPoolExecutor;
    7. threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
    8. new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.CallerRunsPolicy());
    9. // 提交5个任务
    10. for(int x = 0 ; x < 5 ; x++) {
    11. threadPoolExecutor.submit(() -> {
    12. System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
    13. });
    14. }
    15. }
    16. }

    控制台输出结果

    pool-1-thread-1---->> 执行了任务
    pool-1-thread-3---->> 执行了任务
    pool-1-thread-2---->> 执行了任务
    pool-1-thread-1---->> 执行了任务
    main---->> 执行了任务
     

    源码解析

    核心变量:

    1. //参考ThreadPoolExecutor.java类
    2. // ctl:可以看作一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
    3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    4. // COUNT_BITS:Integer.SIZE=32,所以COUNT_BITS为29位
    5. private static final int COUNT_BITS = Integer.SIZE - 3;
    6. // CAPACITY:线程池允许的最大线程数。1左移29位,然后-1,即为2^29 -1
    7. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    8. // runState存储在高3位二进制中
    9. // 线程池5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    10. //线程池的各种状态:
    11. //用二进制表示RUNNING 1010 0000 0000 0000 0000 0000 0000 0000 运行态,可处理新任务并执 行队列中的任务
    12. private static final int RUNNING = -1 << COUNT_BITS;
    13. //用二进制表示SHUTDOWN 0000 0000 0000 0000 0000 0000 0000 0000 关闭态,不接受新任务,但 处理队列中的任务
    14. private static final int SHUTDOWN = 0 << COUNT_BITS;
    15. //用二进制表示STOP 0010 0000 0000 0000 0000 0000 0000 0000 停止态,不接受新任务,不处理队列中任务,且打断运行中任务
    16. private static final int STOP = 1 << COUNT_BITS;
    17. //用二进制表示TIDYING 0100 0000 0000 0000 0000 0000 0000 0000 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
    18. private static final int TIDYING = 2 << COUNT_BITS;
    19. //用二进制表示TERMINATED 0110 0000 0000 0000 0000 0000 0000 0000 结束态,terminated() 方法已完成
    20. private static final int TERMINATED = 3 << COUNT_BITS
    21. //任务缓存队列,用来存放等待执行的任务
    22. private final BlockingQueue workQueue;
    23. //操作许多变量都需要这个锁
    24. private final ReentrantLock mainLock = new ReentrantLock();
    25. //存放工作集,需要获取mainLock才可以操作这个变量
    26. private final HashSet workers = new HashSet();
    27. //是否允许为核心线程设置存活时间
    28. private volatile boolean allowCoreThreadTimeOut;
    29. //用来记录线程池中曾经出现过的最大线程数
    30. private int largestPoolSize;
    31. //用来记录已经执行完毕的任务个数
    32. private long completedTaskCount
    33. // runStateOf: 获取线程池状态,通过按位与操作,低29位将全部变成0
    34. private static int runStateOf(int c) { return c & ~CAPACITY; }
    35. // workerCountOf: 获取线程池worker数量,通过按位与操作,高3位将全部变成0
    36. private static int workerCountOf(int c) { return c & CAPACITY; }
    37. // ctlOf: 根据线程池状态和线程池worker数量,生成ctl值
    38. private static int ctlOf(int rs, int wc) { return rs | wc; }
    39. //线程空闲的存活时间
    40. private volatile long keepAliveTime;
    41. //核心线程是否超时剔除
    42. private volatile boolean allowCoreThreadTimeOut;
    43. //核心线程数量
    44. private volatile int corePoolSize;
    45. //总线程池数量(非核心线程数量=总线程池数量-核心线程数量)
    46. private volatile int maximumPoolSize;

    execut方法

    1. public void execute(Runnable command) {
    2. //首先进行空指针判断
    3. if (command == null)
    4. throw new NullPointerException();
    5. //取出ctl当前值
    6. int c = ctl.get();
    7. //查找出当前工作线程的数量,如果工作线程数量小于线程池的线程容量,就进入代码块
    8. if (workerCountOf(c) < corePoolSize) {
    9. //进入代码块,创建一个工作线程
    10. if (addWorker(command, true))
    11. return;
    12. c = ctl.get();
    13. }
    14. //通过isRunning方法获取当前线程的状态,如果是运行状态,那么就将任务加入队列中
    15. if (isRunning(c) && workQueue.offer(command)) {
    16. int recheck = ctl.get();
    17. //进行二次判断,如果是非运行状态,就移除任务,并调用拒绝策略
    18. if (! isRunning(recheck) && remove(command))
    19. reject(command);
    20. else if (workerCountOf(recheck) == 0)
    21. //如果当前线程池处于运行状态,并且工作线程数量为0,就创建一个空任务延长非核心线程的存活时间,避免出现有任务没线程处理的bug,保证线程处于运行状态时,必须有一个任务在执行
    22. addWorker(null, false);
    23. }
    24. //核心线程数已经用完,这时候调用非核心线程处理任务,如果失败就调用拒绝策略
    25. else if (!addWorker(command, false))
    26. reject(command);
    27. }

    addworker方法

    1. private boolean addWorker(Runnable firstTask, boolean core) {
    2. retry:
    3. for (;;) {
    4. //获取ctl的值,然后根据ctl的值获取到当前线程池的状态
    5. int c = ctl.get();
    6. int rs = runStateOf(c);
    7. // 满足一下条件就返回false
    8. //条件一:线程池的状态大于shutdown(非running)
    9. //条件二:线程池的状态等于shutdown,传入的任务为空,任务队列不为空
    10. if (rs >= SHUTDOWN &&
    11. ! (rs == SHUTDOWN &&
    12. firstTask == null &&
    13. ! workQueue.isEmpty()))
    14. return false;
    15. for (;;) {
    16. int wc = workerCountOf(c);
    17. //条件一:当工作线程数量大于系统允许的最大线程数
    18. //条件二:如果是核心线程,超过和核心线程设定容量或非核心线程超过线程池总容量
    19. //满足以上则返回false
    20. if (wc >= CAPACITY ||
    21. wc >= (core ? corePoolSize : maximumPoolSize))
    22. return false;
    23. //将ctl+1,表示线程数量+1,如果自增成功就跳出死循环
    24. if (compareAndIncrementWorkerCount(c))
    25. break retry;
    26. c = ctl.get();
    27. //如果当前的线程池状态发生了变化,那么就从头进行判断
    28. if (runStateOf(c) != rs)
    29. continue retry;
    30. }
    31. }
    32. boolean workerStarted = false;
    33. boolean workerAdded = false;
    34. Worker w = null;
    35. try {
    36. // 内部类 封装了线程和任务 通过threadfactory创建线程
    37. w = new Worker(firstTask);
    38. final Thread t = w.thread;
    39. if (t != null) {
    40. final ReentrantLock mainLock = this.mainLock;
    41. mainLock.lock();
    42. try {
    43. // 获取到当前线程池的状态
    44. int rs = runStateOf(ctl.get());
    45. //满足以下条件才能往下进行
    46. //条件一:状态为running
    47. //条件二:状态为shutdown,并且新任务为null
    48. if (rs < SHUTDOWN ||
    49. (rs == SHUTDOWN && firstTask == null)) {
    50. if (t.isAlive()) // precheck that t is startable
    51. throw new IllegalThreadStateException();
    52. //满足条件,将当前的workers加入hashset中,表明已经处于工作状态
    53. workers.add(w);
    54. int s = workers.size();
    55. if (s > largestPoolSize)
    56. largestPoolSize = s;
    57. workerAdded = true;
    58. }
    59. } finally {
    60. mainLock.unlock();
    61. }
    62. if (workerAdded) {
    63. //开始调用start方法,进行处理任务
    64. t.start();
    65. workerStarted = true;
    66. }
    67. }
    68. } finally {
    69. if (! workerStarted)
    70. addWorkerFailed(w);
    71. }
    72. return workerStarted;
    73. }

    runworker方法

    1. final void runWorker(Worker w) {
    2. Thread wt = Thread.currentThread();
    3. Runnable task = w.firstTask;
    4. w.firstTask = null;
    5. //此处解锁表示允许被中断
    6. w.unlock();
    7. boolean completedAbruptly = true;
    8. try {
    9. //不断的拿到任务处理,首先先拿到传进来的任务,如果任务为空,再尝试从阻塞队列中拿任务
    10. while (task != null || (task = getTask()) != null) {
    11. w.lock();
    12. //当线程池的状态已经大于STOP,那么就进行中断当前线程
    13. if ((runStateAtLeast(ctl.get(), STOP) ||
    14. (Thread.interrupted() &&
    15. runStateAtLeast(ctl.get(), STOP))) &&
    16. !wt.isInterrupted())
    17. wt.interrupt();
    18. try {
    19. //处理前置方法,可以用来重写,当任务在处理之前进行什么操作者
    20. beforeExecute(wt, task);
    21. Throwable thrown = null;
    22. try {
    23. //执行run方法
    24. task.run();
    25. } catch (RuntimeException x) {
    26. thrown = x; throw x;
    27. } catch (Error x) {
    28. thrown = x; throw x;
    29. } catch (Throwable x) {
    30. thrown = x; throw new Error(x);
    31. } finally {
    32. //处理后置方法,可用来重写,当run方法处理完之后可以进行的操作
    33. afterExecute(task, thrown);
    34. }
    35. } finally {
    36. //将任务设置为null,这样可以接收新的任务
    37. task = null;
    38. //完成的任务数加一
    39. w.completedTasks++;
    40. w.unlock();
    41. }
    42. }
    43. completedAbruptly = false;
    44. } finally {
    45. processWorkerExit(w, completedAbruptly);
    46. }
    47. }

    getTask方法

    1. private Runnable getTask() {
    2. //判断是否超时(超时后的线程应该被剔除掉)
    3. boolean timedOut = false;
    4. for (;;) {
    5. //获取当前ctl的值并计算出当前的线程池状态
    6. int c = ctl.get();
    7. int rs = runStateOf(c);
    8. // Check if queue empty only if necessary.
    9. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    10. decrementWorkerCount();
    11. return null;
    12. }
    13. int wc = workerCountOf(c);
    14. //判断工人是否会被淘汰
    15. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    16. //☆
    17. if ((wc > maximumPoolSize || (timed && timedOut))
    18. && (wc > 1 || workQueue.isEmpty())) {
    19. if (compareAndDecrementWorkerCount(c))
    20. return null;
    21. continue;
    22. }
    23. try {
    24. //当队列为空的时候,poll()方法会直接返回null(设置了等待时间,那么poll会等待keepAliveTime时长,如果没有则返回null),但是take()方法会一直等待
    25. //我们可以通过是否发生异常判断当前线程是否需要剔除,没发生异常 说明timed为true,允许剔除(poll方法不会抛异常),而发生异常说明timed为false,不允许剔除(take方法会抛出异常)
    26. Runnable r = timed ?
    27. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    28. workQueue.take();
    29. if (r != null)
    30. return r;
    31. //等待时长超过了设定的过期时间后仍然没取到任务,将timeOut设置成ture,然后再循环一次,在上面//☆的地方判断是否需要剔除后,满足条件则将线程数量减一
    32. timedOut = true;
    33. } catch (InterruptedException retry) {
    34. timedOut = false;
    35. }
    36. }
    37. }

    processWorkerExit方法

    1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
    2. //completedAbruptly这个字段用来判断是否是异常结束的状态,可以在runworker的方法中找到。
    3. //正常情况下,在gettask方法中,如果线程达到了剔除的条件,就会调用decrementWorkerCount()方法,将工作线程数量减一
    4. //但是是因为异常结束的状态,导致之前没有对工作线程数量减一,所以现在要进行减一操作
    5. if (completedAbruptly)
    6. decrementWorkerCount();
    7. final ReentrantLock mainLock = this.mainLock;
    8. mainLock.lock();
    9. try {
    10. //将线程池完成的任务数加上当前线程完成的任务数量
    11. completedTaskCount += w.completedTasks;
    12. //从workers中移除线程
    13. workers.remove(w);
    14. } finally {
    15. mainLock.unlock();
    16. }
    17. //线程成功被移除后尝试看是否能关闭线程池
    18. tryTerminate();
    19. int c = ctl.get();
    20. //判断线程池的状态是否小于stop(也就是running)
    21. if (runStateLessThan(c, STOP)) {
    22. if (!completedAbruptly) {
    23. //如果是非异常结束状态,那么表示线程还有可能不会被踢出(重新创建一个线程),需要在下面在进行判断
    24. //判断线程池允许的最小线程数,如果allowCoreThreadTimeOut 为true,表示核心线程超时后也可以被剔除,所以min是0
    25. //而allowCoreThreadTimeOut为false表示核心线程不能被剔除,所以min为最大核心线程数量
    26. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    27. //当线最小线数量为0,并且任务队列不为空,那么最小线程数量设置为1(必须要有一个线程来处理队列中的任务)
    28. if (min == 0 && ! workQueue.isEmpty())
    29. min = 1;
    30. //当前工作线程数量大于或等于允许的最小线程数量,那么就直接返回,到此这个线程的所有方法已经执行完毕。
    31. if (workerCountOf(c) >= min)
    32. return;
    33. }
    34. //如果执行到这里,说明还需要创建线程来处队列中的任务,那么再次调用addworker
    35. addWorker(null, false);
    36. }
    37. }

  • 相关阅读:
    白话文解析LiteFlow的理念是什么?什么时候用该怎么用?干货满满
    Day 5 C++
    WEB API 接口签名sign验证入门与实战
    494.目标和 474.一和零
    ubuntu18下安装coova-chilli
    三、日志编写 —— TinyWebServer
    SV基础知识---功能覆盖率 覆盖组、数据采样(语言部分)
    milvus的GPU索引
    【附gpt4.0升级秘笈】百度智能云万源全新一代智能计算操作系统发布:引领AI新纪元
    go泛型教程
  • 原文地址:https://blog.csdn.net/Promise_J_Z/article/details/126499828