• 线程池源码解析 2.工作原理与内部结构


    线程池源码解析—工作原理与内部结构

    工作原理

    概述

    image-20221113110625730

    • 线程池是线程的池子,本质上是通过单个线程执行多个并发任务,使得尽量少的创建线程,减少开销。
    • 在线程池内部,是没有区分核心线程和非核心线程的,是通过 Set 集合的大小来进行区分的。

    线程池状态

    • 线程池有以下几种状态,在不同的状态之间,有着不同的处理逻辑,在代码中,有着大量的判断逻辑:

    image-20221017155420528

    拒绝策略

    image-20221017155839747

    image-20221017155924290

    • 以上是 JDK 提供的一些拒绝策略,这四个用的比较多的是第一种 AbortPolicy,也是默认拒绝策略。

    • 而我们在业务开发过程中,往往会自定义线程池拒绝策略进行处理。

    • 线程池拒绝策略接口

      public interface RejectedExecutionHandler {
          
      	void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
      }
      
      • 1
      • 2
      • 3
      • 4

      image-20221018204515142

    ThreadPoolExecutor 内部结构

    核心属性之 ctl

        /*
         * 线程池核心属性之一 ctl
         * 高3位表示当前线程池运行状态,低29位表示当前线程池中所拥有的线程数量
         * 是一个原子类 AtomicInteger
         */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    	/*
    	 * 表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位
    	 * Integer.SIZE = 32 => 32 - 3 = 29 表示低29位用来存放当前线程数量的位
    	 */
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
    	/*
    	 * 表示低29位能表示的最大的线程数 就是 1 << 29 - 1 (大概是5亿多)
    	 * CAPACITY = 000 11111111111111111111111111111
    	 */
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    	
    	/*
    	 * 下面的表示线程池的5种状态
    	 * 状态从上到下依次递增
    	 */
        // 111 00000000000000000000000000000(二进制) 转换成10进制是一个负数
        private static final int RUNNING    = -1 << COUNT_BITS;
    
    	// 000 00000000000000000000000000000 
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
    	// 001 00000000000000000000000000000 
        private static final int STOP       =  1 << COUNT_BITS;
    
    	// 010 00000000000000000000000000000 
        private static final int TIDYING    =  2 << COUNT_BITS;
    
    	// 011 00000000000000000000000000000 
        private static final int TERMINATED =  3 << COUNT_BITS;
    	
    	/*
    	 * 获取当前线程池的运行状态
    	 * CAPACITY = 000 11111111111111111111111111111 取反后 => ~CAPACITY = 111 00000000000000000000000000000
    	 * 因为要进行一个&运算,而~CAPACITY的值是固定的,根据这个值并且我们知道ctl的高三位
    	 * 表示线程池的运行状态,所以进行&运算后就能获取到ctl的高三位的状态,即线程池的状态
    	 * c = ctl = 111 00000000000000000000000000111(表示当前线程池RUNNING状态 并且有7个线程)
    	 *                            &
    	 *           111 00000000000000000000000000000
    	 *                            =
    	 *           111 00000000000000000000000000000(最终只会保留高三位 即线程池的状态)
    	 */
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        
    	/*
    	 * 获取当前线程池的线程数量
    	 * CAPACITY = 000 11111111111111111111111111111
    	 * 这个值跟ctl进行&运算,取出ctl的低29位的值,即表示获取线程池中的线程数量
    	 */
    	private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    	/* 
    	 * 用在重置当前线程池ctl值时会用到 
    	 * rs 表示线程池状态, wc表示当前线程池中worker(线程)数量
    	 * |表示的就是不进位加法 表示的就是通过rs 和 wc重新构建一个ctl
    	 * 111 000000000000000000
         * 000 000000000000000111
         * 111 000000000000000111
    	 */
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    	
    	/*
    	 * 表示当前线程池ctl所表示的状态是否小于某个状态s
    	 * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    	 */
    	private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    	
    	/*
    	 * 表示当前线程池ctl所表示的状态是否大于等于某个状态s
    	 */
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    	
    	/*
    	 * 判断线程池是否处于RUNNING状态
    	 * 小于SHUTDOWN的状态一定是RUNNING状态 
    	 */
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    常用简单方法

      	/*  
      	 * 使用CAS的方式让 ctl值+1,成功返回true 失败返回false
      	 * 即尝试添加一个线程(Worker实际上就是工作者线程)
      	 */
    	private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        /*
         * 使用CAS的方式让 ctl值-1,成功返回true 失败返回false
      	 * 即尝试干掉一个线程(Worker实际上就是工作者线程)
         */
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
    
        /*
    	 * 将ctl的值-1, 这个方法一定成功,使用的是 自旋 + CAS 的方式保证
         */
        private void decrementWorkerCount() {
            do {} while (!compareAndDecrementWorkerCount(ctl.get()));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    核心成员属性

        /*
         * 线程池全局锁
         * 增加worker(线程) 减少worker 时需要持有mainLock,修改线程池运行状态时也需要
         */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        // 线程池中真正存到 worker(Thread)的地方 工作者集合
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        /*
         * 条件队列
         * 当外部线程调用awaitTermination()方法时,外部线程会阻塞等待当前线程池状态为Termination为止
         * 底层类似AQS的原理,等待就是将当前线程封装成一个Node,然后进入Condition的等待队列中(线程被park)。当线程池状态变为termination时,
         * 会通过调用termination.signalAll()将这些线程全部唤醒,进入到阻塞队列中(AQS),继续去争抢锁(每次只有头节点可以获得锁)
         * 抢占到的线程,会继续执行awaitTermination() 后面程序。这些线程最后,都会正常执行。
         * 简单理解:termination.await() 会将线程阻塞在这
         *          termination.signalAll() 会将阻塞在这的线程依次唤醒
         */
        private final Condition termination = mainLock.newCondition();
    
        // 记录线程池生命周期内,线程数的最大值
        private int largestPoolSize;
    
        // 记录线程池所完成的任务总数,当一个worker退出时,会将worker完成的任务累加到这个属性中
        private long completedTaskCount;
    	
    	/* 
     	 *  线程池7大核心参数之一:任务队列:BlockingQueue(阻塞队列)是一个接口
      	 *  当线程池中的正在工作的线程达到核心线程数时,这时再提交的任务会直接放到workQueue中
      	 *  常用的实现类有基于数组的阻塞队列  ArrayBlockingQueue
      	 *  		     基于链表的阻塞队列  LinkedBlockingQueue 
     	 */
        private final BlockingQueue<Runnable> workQueue;
    
        /*
         * 线程池的7大参数之一,线程的创建工厂,创建线程时会使用,是一个接口
         * 当我们使用 Executors.newFix...  newCache... 创建线程池时,使用的是 DefaultThreadFactory
         * 一般不推荐使用默认的实现类DefaultThreadFactory,推荐自己实现ThreadFactory
         */
        private volatile ThreadFactory threadFactory;
    
        /*
         * 线程池7大核心参数之一,拒绝策略,是一个接口,有四种实现,默认是直接丢弃并抛出异常
    	 * DiscardOldestPolicy   ---> 丢弃队列中最老(最先入队)的任务
    	 * AbortPolicy           ---> 直接丢弃新来的任务 抛出异常 (默认的)
    	 * CallerRunsPolicy      ---> 直接调用run方法,相当于同步方法
    	 * DiscardPolicy         ---> 直接丢弃新来的任务 不抛出异常
         */
        private volatile RejectedExecutionHandler handler;
    
    	/*
    	 * 线程池7大核心参数之一:空闲线程存活时间
    	 * 当allowCoreThreadTimeOut为false时,只有当非核心线程空闲时间达到指定时间时才会被回收
    	 * 当allowCoreThreadTimeOut为true时,线程池内所有的线程到达指定的时间均会被回收
    	 * 此参数常常和 TimeUnit一起使用,指定超时时间的单位(也是线程池的7大核心参数之一) 
    	 */
        private volatile long keepAliveTime;
    
    	// 控制线程池内核心线程空闲时间达到指定时间时能否被回收 true 可以 false不可以
        private volatile boolean allowCoreThreadTimeOut;
    	
    	/*
    	 * 线程池7大核心参数之一:核心线程数
    	 */
        private volatile int corePoolSize;
    
        /*
         * 线程池7大核心参数之一:最大线程数
         */
        private volatile int maximumPoolSize;
    
        // 缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式
        private static final RejectedExecutionHandler defaultHandler =
                new AbortPolicy();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    核心内部类 Worker

    	/*
         * Worker采用了AQS的 独占 模式
         * 独占模式:两个重要属性 state 和 ExclusiveOwnerThread
         * state:0时表示表示未被占用,> 0时表示被占用,< 0时表示初始状态,这种情况下不能被抢锁,只有等于0时才能尝试去抢锁
         * ExclusiveOwnerThread表示独占(抢到)锁的线程
         */		
    	private final class Worker
            extends AbstractQueuedSynchronizer // 是AQS的子类
            implements Runnable // 实现了Runnable接口
        {
    
            private static final long serialVersionUID = 6138294804551838833L;
    
            // worker内部封装的工作线程
            final Thread thread;
            
        	// 假设firstTask不为空,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到队列中去获取下一个任务	
            Runnable firstTask;
    
            // 记录当前worker所完成的任务数量
            volatile long completedTasks;
    
            /*
             * 构造器 传来的Runnable任务可以为null,firstTask为null的线程启动后会去队列中获取任务
             */
            Worker(Runnable firstTask) {
                // 设置AQS独占模式为初始化中状态,这个时候不能被抢占锁
                setState(-1); 
                // 为内部的firstTask赋值
                this.firstTask = firstTask;         
                /*
                 * 使用线程工厂创建了一个线程,并且将当前worker指定为Runnable,也就是说当thread启动的时候会议worker.run为入口
                 */
                this.thread = getThreadFactory().newThread(this);
            }
    
            /*
             * 当worker启动时,会执行run()方法 当前的这个Worker就是一个任务(Runnable)
             * 底层调用runWorker()直接将this传入了
             */
            public void run() {
                // 直接将当前对象传入进行执行
                // ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入
                runWorker(this);
            }
    
    		/*
    		 * 判断当前worker的独占锁是否被占用
    		 * state为0 表示为被占用
    		 * state为1 表示被占用
    		 */
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    		
        	// 尝试去占用worker的独占锁 
            // 返回值 表示是否抢占成功
            protected boolean tryAcquire(int unused) {
                // CAS的方式,将state设置为1,尝试抢占锁
                if (compareAndSetState(0, 1)) {
                    // CAS成功,成功抢到锁,则将exclusiveOwnerThread设置为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    		
        	/*  
        	 * 尝试释放锁 
             * 外部不会直接调用这个方法,这个方法是 AQS内调用的
             * 外部调用unlock时,unlock -> AQS.release -> tryRelease (模板方法模式)
             */
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    		
        	// 加锁,加锁失败时会阻塞当前线程(类似ReentarntLock),直到获取到锁
            public void lock()        { acquire(1); }
        
        	/*
             * 尝试去加锁,如果锁是未被持有状态,那么加锁成功后会返回true
        	 * 否则 不会阻塞当前线程 直接返回false
        	 */ 
            public boolean tryLock()  { return tryAcquire(1); }
        
            /*
             * 释放锁
        	 * 一般情况下,咱们调用unlock 要保证 当前线程是持有锁的
             * 特殊情况,当worker的 state == -1 时,调用unlock 表示初始化state 设置state == 0
             * 启动worker之前会先调用unlock()这个方法 会强制刷新ExclusiveOwnerThread == null state==0 之后看源码就明白了。
             */
            public void unlock()      { release(1); }
        
        	// 返回当前worker的lock是否被占用
            public boolean isLocked() { return isHeldExclusively(); }
    		
            // 回头再说
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109

    构造方法

    • 其余的 3 个构造方法都是套娃下面的这个构造方法,所以我们直接看这个最核心的即可。
    	/*
    	 * 7个参数的构造方法 传入7大核心参数,为内部属性赋值
    	 */  	
    	public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                                  int maximumPoolSize, // 最大线程数
                                  long keepAliveTime, // 空闲线程存活时间
                                  TimeUnit unit, // 时间单位 seconds nano..
                                  BlockingQueue<Runnable> workQueue, // 阻塞(任务)队列
                                  ThreadFactory threadFactory, // 线程工厂
                                  RejectedExecutionHandler handler) { // 拒绝策略
          	// 判断参数是否合法
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
           
            // 工作队列 和 线程工厂 和 拒绝策略 都不能为空
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
          	// 为属性赋值
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    参考

  • 相关阅读:
    【创建型】生成器模式(Builder)
    kafka的安装和基本操作
    项目构建工具maven的基本配置+idea 中配置 maven
    初学者入门的可视化超级色彩公式
    Unity小技巧——清空所有事件中订阅的方法
    Flowable-6.7.2:数据库详情
    Docker已存在Nginx容器对宿主机映射容器的目录进行修改,完成不同前端项目的部署
    File 和 InputStream, OutputStream 的用法
    xml笔记
    acwing算法提高之图论--最小生成树的典型应用
  • 原文地址:https://blog.csdn.net/weixin_53407527/article/details/127829787