• ThreadPoolExecutor源码解析


    ThreadPoolExecutor执行execute()方法示意图,如下图所示(图片均来自百度地图,懒得画图):

    百度

    在这里插入图片描述

    源码解析

          private final BlockingQueue<Runnable> workQueue;   // 工作线程队列
    
          @ReachabilitySensitive
          private final HashSet<Worker> workers = new HashSet<>();  // 工作线程集合
    
          private final ReentrantLock mainLock = new ReentrantLock();  // 锁
          private final Condition termination = mainLock.newCondition();  // 信号
    
          private int largestPoolSize;  // 线程池最大容量
          private volatile long keepAliveTime; // 存活时间长度
          private volatile boolean allowCoreThreadTimeOut;  // 允许核心线程时间长度
          private volatile int corePoolSize;  // 核心线程池大小
          private volatile int maximumPoolSize;  // 最大线程池的大小
    
    
    
          private volatile ThreadFactory threadFactory;  // 线程工厂--创建线程
    
          // 拒绝策略,当ThreadPoolExecutor已经关闭或已d经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将调用的Handler
          private volatile RejectedExecutionHandler handler;  //默认拒绝策略  AbortPolicy()
                补充: AbortPolicy: 拒绝任务时直接抛出异常,让你感知到任务被拒绝
                      DiscardPolicy: 当新任务被提交后直接丢弃掉,也不会有任何通知
                      DiscardOldestPolicy: 如果线程池没被关闭且没有执行能力,则会丢弃任务队列的头结点,这种策略与上面
                         的策略不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的
                      CallerRunPolicy: 相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则
                         把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处。
                         第一点新提交的任务不会被丢弃,这样也就不会造成业务损失。
                         第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务
                         又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,
                         相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定
                         的空间,相当于是给了线程池一定的缓冲期。
    
          private final class Worker extends AbstractQueuedSynchronizer
    
          //执行线程
          public void execute(Runnable command){
               if(command==null){
                    throw new NullPointerException();
               }
               int c = ctl.get()  // ctl = new AtomicInteger(ctlOf(RUNNING, 0));  原子操作
               if(workerCountOf(c)<corePoolSize){   // 1. 是否小于核心线程数
                   // 小于核心线程数目 则 通过 ThreadFactory 创建新线程
                   if(addWorker(command,true)) return;
                   c = ctl.get();
               }else{
                  // 大于核心线程数  则  添加到 workQueue 中
                  if(isRunning(c) && workQueue.offer(command){
                       int recheck  = ctl.get();
                       if(!isRunning(recheck) && remove(command)){
                           reject(command);
                       }else if(workerCountOf(recheck)==0){
                           addWorker(null,false);
                       }
                  }else if(!addWorker(command,false){   //添加到
                      reject(command);
                  }
               }
          }
    
          public boolean remove(Runnable task) {
              boolean removed = workQueue.remove(task);
              tryTerminate(); // In case SHUTDOWN and now empty
              return removed;
          }
    
          addWorker(Runnable firstTask,boolean core){
              retry:  // 计数,并且根据是否 core 判断 是否 大于 corePoolSize 或者 maximumPoolSize
              for(;;){
                for(;;){
                }
              }
              boolean workerStarted = false;
              boolean workerAdded = false;
              Worker w = null;
              try{
                  w = new Worker(firstTask);
                  Thread t = w.thread;
                  if(t!=null){
                     final ReentrantLock mainLock = this.mainLock;
                     mainLock.lock();  // 锁住
                     try{
                        workers.add(w);
                        workerAdded = true;
                     }finally{
                        mainLock.unLock();  //解锁
                     }
                     if(workerAdded){
                        t.start();
                        workerStarted = true;
                     }
                  }
              }finally{
                  if(!workerStarted){
                      addWorkerFailed(w);
                  }
              }
              return workerStarted;
          }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
  • 相关阅读:
    使用Docker发布部署C# .NET core WebApi到服务器
    yara
    浅谈C++|STL之算法函数篇
    MySQL 多表查询 事务 索引
    TFT-LCD显示中英文
    服务器感染的病毒有哪些特点呢?
    在ASF中使用On Demand生产DEM等产品时使用不同参考DEM的区别
    使用 ClickHouse 深入了解 Apache Parquet (二)
    经典面试题-lock与synchronized异同点
    什么是正负样本
  • 原文地址:https://blog.csdn.net/qq_35920289/article/details/127870364