• ThreadPoolExecutor源码浅析


    ThreadPoolExecutor的介绍在
    https://blog.csdn.net/Logan_addoil/article/details/97036881
    今天主要看execute 方法
    在看execute前先理解ctl

    ctl 本质就是一个int类型的数值
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //Integer.SIZE = 32 ,COUNT_BITS = 29
    //ctl 表述两个状态:
    //1. 表示线程池当前状态(高三位)
    //2. 标识线程池当前的工作线程个数(低29位)
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //表述了工作线程的最大数量(核心线程数量+非核心线程数量)
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    //线程池的五种状态 //高三位
    private static final int RUNNING = -1 << COUNT_BITS;// 111
    private static final int SHUTDOWN = 0 << COUNT_BITS;// 000
    private static final int STOP = 1 << COUNT_BITS;// 001
    private static final int TIDYING = 2 << COUNT_BITS;// 010
    private static final int TERMINATED = 3 << COUNT_BITS;// 011

    //计算出当前线程池的状态
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    //计算当前线程池的工作线程个数
    private static int workerCountOf(int c) { return c & CAPACITY; }

    .execute()执行流程

    源码:

    public void execute(Runnable command) {
        //非空判断(健壮性)
    	if (command == null)
    		throw new NullPointerException();
        //获取ctl属性
    	int c = ctl.get();
    	//判断 工作线程个数 是否小于核心线程数
    	if (workerCountOf(c) < corePoolSize) {
    	    //true表示添加一个核心线程,成功直接return,失败重新获取ctl,往下执行
    		if (addWorker(command, true))
    			return;
    		c = ctl.get();
    	}
    	//创建核心线程失败,判断当前线程池状态是否是RUNNING
    	//是RUNNING,执行offer方法将任务添加到工作队列
    	if (isRunning(c) && workQueue.offer(command)) {
    	    // 添加成功,再次获取ctl
    		int recheck = ctl.get();
    		//判断是否是RUNNING ,不是的话要将任务从工作队列中移除,执行拒绝策略
    		if (! isRunning(recheck) && remove(command))
    			reject(command);
    	    //判断工作线程数是否为0,
    		else if (workerCountOf(recheck) == 0)
    		    //工作线程数为0 但是工作队列在排队
    			//添加一个空任务的非核心线程,处理正在排队的任务
    			addWorker(null, false);
    	}
    	//添加任务到工作队列失败,添加非核心线程去执行任务
    	else if (!addWorker(command, false))
    	    //添加非核心线程失败,执行拒绝策略
    		reject(command);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    addWorker添加并启动工作线程

    源码:

    //core为true创建核心线程,false:非核心线程
    private boolean addWorker(Runnable firstTask, boolean core) {
        //外层for循环的标识
    	retry:
    	for (;;) {
    	    //获取ctl的值
    		int c = ctl.get();
    		//拿到线程池的状态
    		int rs = runStateOf(c);
    
    		// 线程池状态不是RUNNING 
    		if (rs >= SHUTDOWN &&
    		       //线程池状态时SHUTDOWN ,并且任务为空,并且工作队列不为空
    			   //满足上面的条件,就是要开始处理工作队列中的任务
    			! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())  )
    			// 只要不是RUNNING,不处理新任务,
    			// 如果是SHUTDOWN,满足了addWorker(null,false),并且工作队列有任务时,不走这步
    			return false;
    
    		for (;;) {
    		    // 基于ctl获取当前工作线程的最大值
    			int wc = workerCountOf(c);
    			// 判断工作线程是否大于最大值
    			if (wc >= CAPACITY ||
    			    //当前线程数 核心线程数 和非核心线程数
    				wc >= (core ? corePoolSize : maximumPoolSize))
    				return false;
    			// 以CAS的方式对线程数加一,成功就跳出循环
    			if (compareAndIncrementWorkerCount(c))
    				break retry;
    			//重新获取 ctl
    			c = ctl.get(); 
    			//判断线程池状态是否改变,改变需要重新判断
    			if (runStateOf(c) != rs)
    				continue retry;
    			
    		}
    	}
        //上面的判断通过,下面是添加工作线程,并启动
    	boolean workerStarted = false;
    	boolean workerAdded = false;
    	Worker w = null;
    	try {
    	    //new  Worker ,将任务扔进去启动
    		w = new Worker(firstTask);
    		//获取Worker 中的Thread
    		final Thread t = w.thread;
    		if (t != null) {
    		    // 加锁, 防止被SHUTDOWN
    			final ReentrantLock mainLock = this.mainLock;
    			mainLock.lock();
    			try {
    				// 重新获取线程池的状态,防止添加工作线程时被SHUTDOWN
    				int rs = runStateOf(ctl.get());
                    // 满足状态为 RUNNING 或者 线程池状态为SHUTDOWN,并且任务为null(创建非核心线程处理工作队列中的)
    				if (rs < SHUTDOWN ||
    					(rs == SHUTDOWN && firstTask == null)) {
    					//开始添加工作线程
    					if (t.isAlive()) 
    						throw new IllegalThreadStateException();
    					//将构建完成的 Worker 添加到 HashSet
    					workers.add(w);
    					int s = workers.size();
    					//记录最大工作线程数
    					if (s > largestPoolSize)
    						largestPoolSize = s;
    					//将添加工作线程的标识设置为true	
    					workerAdded = true;
    				}
    			} finally {
    			    //释放锁
    				mainLock.unlock();
    			}
    			if (workerAdded) {
    			    //添加成功 启动线程
    				t.start();
    				//将工作线程启动状态设置为true
    				workerStarted = true;
    			}
    		}
    	} finally {
    	    //添加工作线程失败
    		if (! workerStarted)
    			addWorkerFailed(w);
    	}
    	return workerStarted;
    }
    
    //添加工作线程失败的处理
    private void addWorkerFailed(Worker w) {
        //加锁
    	final ReentrantLock mainLock = this.mainLock;
    	mainLock.lock();
    	try {
    	    //判断添加工作线程是否成功
    		if (w != null)
    			workers.remove(w);
    		//添加工作线程失败,将工作线程数减一
    		decrementWorkerCount();
    		//尝试将线程池状态变为 TIDYING
    		tryTerminate();
    	} finally {
    		mainLock.unlock();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    Worker 对象

    // 继承了AbstractQueuedSynchronizer 关于线程的中断
    // 实现了Runnable

    private final class Worker extends AbstractQueuedSynchronizer  implements Runnable {
    	
    	private static final long serialVersionUID = 6138294804551838833L;
    
    	// 工作线程的Thread 对象,初始化时创建的
    	final Thread thread;
    	
    	Runnable firstTask;
    	
    	volatile long completedTasks;
    
    	
    	Worker(Runnable firstTask) {
    	    //刚刚初始化的线程,不允许被中断
    		setState(-1);
    		//第一次 new 的时候,会将任务赋值给 firstTask
    		this.firstTask = firstTask;
    		//给 Worker 创建 Thread 对象
    		this.thread = getThreadFactory().newThread(this);
    	}
    
    	// 调用t.start 执行当前的run方法
    	public void run() {
    		runWorker(this);
    	}
    
    	// 中断线程不是立即让线程停止的,只是将Thread 的中断标识设置为 true
    	protected boolean isHeldExclusively() {
    		return getState() != 0;
    	}
    
    	protected boolean tryAcquire(int unused) {
    	    //将中断标识从 0 置为 1
    		if (compareAndSetState(0, 1)) {
    			setExclusiveOwnerThread(Thread.currentThread());
    			return true;
    		}
    		return false;
    	}
        // 将 state 置为 0 ,表示当前线程允许被中断
    	protected boolean tryRelease(int unused) {
    		setExclusiveOwnerThread(null);
    		setState(0);
    		return true;
    	}
    
        //自己内部实现了  AQS 
    	//不允许中断 不是可重入锁
    	public void lock()        { acquire(1); }
    	public boolean tryLock()  { return tryAcquire(1); }
    	public void unlock()      { release(1); }
    	public boolean isLocked() { return isHeldExclusively(); }
    
    	void interruptIfStarted() {
    		Thread t;
    		if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    			try {
    				t.interrupt();
    			} catch (SecurityException ignore) {
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    runWoker

    // 执行任务的流程,和中断线程相关的加锁 runWorker

    final void runWorker(Worker w) {
        // 拿到当前工作线程
    	Thread wt = Thread.currentThread();
    	//拿到 Worker对象中的任务 
    	Runnable task = w.firstTask;
    	// 将 firstTask 归位
    	w.firstTask = null;
    	// 允许中断
    	w.unlock();
    	// 任务执行钩子函数中,是否出现异常
    	boolean completedAbruptly = true;
    	try {
    	    // 获取任务的第一个方式: 执行 execute,submit 时传入的任务
    		// 第二个方式: 从工作队列中获取任务
    		while (task != null || (task = getTask()) != null) {
    		    // 加锁  在 SHUTDOWN 状态下不允许被中断
    			//Worker内部实现的锁不是可重入锁,
    			//因为在中断时,也需要对worker进行lock,不能获取就代表当前工作线程正在执行任务
    			w.lock();
    			// 1.这里判断线程池状态为 STOP ,必须将当前线程中断
    			// 2.查看中断标记位,并归位,如果为 false 说明不是 STOP,如果是true,需要再次查看是否是并发操作导致线程池为STOP
    			if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&  runStateAtLeast(ctl.get(), STOP))) 
    			    && !wt.isInterrupted()) //判断当前标志位是否为false ,是就执行wt.interrupt();
    				//设置中断标记位为 true
    				wt.interrupt();
    			
    			
    			try {
    			    //(需要自己实现) 执行任务的钩子函数, 前置增强,不是动态代理
    				beforeExecute(wt, task);
    				Throwable thrown = null;
    				try {
    				    //执行任务
    					task.run();
    				} catch (RuntimeException x) {
    					thrown = x; throw x;
    				} catch (Error x) {
    					thrown = x; throw x;
    				} catch (Throwable x) {
    					thrown = x; throw new Error(x);
    				} finally {
    				    //(需要自己实现) 执行任务的钩子函数 ,后置增强,不是动态代理
    					afterExecute(task, thrown);
    				}
    			} finally {
    			    //将task 置为 null
    				task = null;
    				// 执行成功的任务个数 +1
    				w.completedTasks++;
    				// 将state 置为 0
    				w.unlock();
    			}
    		}
    		completedAbruptly = false;
    	} finally {
    	    //这里会移除工作线程
    		processWorkerExit(w, completedAbruptly);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    当前的工作线程如何拿到任务的:

    1. 直接传入的 execute,submit
    2. getTask 从工作队列中获取任务

    getTask()

    private Runnable getTask() {
        // 判断非核心线程 是否可以干掉
    	boolean timedOut = false; 
    
    	for (;;) {
    	    // 获取ctl属性
    		int c = ctl.get();
    		// 拿到线程池的状态
    		int rs = runStateOf(c);
    
    		// 线程池状态不是RUNNING ,
    		// 线程池状态大于 stop,需要移除当前工作线程
    		// 如果线程池状态为 SHUTDOWN ,并且工作队列为空
    		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    		    //移除当前工作线程(实际上是数量-1,真正的移除操作在processWorkerExit)
    			decrementWorkerCount();
    			return null;
    		}
            //判断工作线程数量
    		//拿到工作线程的数量
    		int wc = workerCountOf(c);
    
    		// allowCoreThreadTimeOut: 是否允许核心线程超时(一般都是false)
    		// 工作线程是否大于核心线程数据
    		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            //判断工作线程数大于最大线程数(基本不会发生)
    		//判断工作线程数大于核心线程数,并且当前线程已经超时
    		if ((wc > maximumPoolSize || (timed && timedOut))
    		    // 工作线程数大于 1  或者 工作队列为空 
    			// 就是表示有其他线程可以处理任务,自己可以被干掉
    			&& (wc > 1 || workQueue.isEmpty())) {
    			// 基于CAS的方式 移除掉当前线程,返回 null ,移除操作在processWorkerExit
    			if (compareAndDecrementWorkerCount(c))
    				return null;
    			continue;
    		}
     
            //从工作队列中获取任务
    		try {
    		    
    			Runnable r = timed ?
    			    //阻塞一定时间从工作队列拿任务(理解为非核心线程走这里)
    				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    				//一直阻塞 (核心线程走这里)
    				workQueue.take();
    			//如果拿到任务直接返回执行
    			if (r != null)
    				return r;
    			// 从队列中获取任务,超时了 (达到了最大生存时间)
    			timedOut = true;
    		} catch (InterruptedException retry) {
    			timedOut = false;
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    processWorkerExit

    // 移除当前的工作线程做的操作

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果执行此方法时不是因为getTask方法,而是因为直接异常引起的(一般是钩子函数)
    	if (completedAbruptly) 
    	    //直接执行方法报错,扣减工作线程数
    		decrementWorkerCount();
    
        // 加锁
    	final ReentrantLock mainLock = this.mainLock;
    	mainLock.lock();
    	try {
    	    //记录当前线程池一共多少个任务
    		completedTaskCount += w.completedTasks;
    		//移除当前任务(这里是真正移除)
    		workers.remove(w);
    	} finally {
    		mainLock.unlock();
    	}
        
    	//尝试将线程池从过渡状态->销毁状态
    	tryTerminate();
    
        // 重新获取线程池的状态,说明是 RUNNING,SHUTDOWN 
    	// 只有不是STOP状态才处理,是STOP状态有任务也不处理
    	int c = ctl.get();
    	if (runStateLessThan(c, STOP)) {
    	    // 如果是正常状态移除当前工作线程
    		if (!completedAbruptly) {
    		    // 核心线程数最小值允许多少
    			int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    			// 如果工作队列中的任务不为空,设置工作线程的最小值为1
    			if (min == 0 && ! workQueue.isEmpty())
    				min = 1;
    			//如果还有工作线程在线程池,
    			if (workerCountOf(c) >= min)
    				return; // replacement not needed
    		}
    		//如果不是正常移除,或者(工作队列不为空并且没有工作线程) 再添加一个工作线程
    		addWorker(null, false);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
  • 相关阅读:
    Java设计模式 --建造者模式【Builder Pattern】
    USACO22FEB Moo Network G
    PyG-GAT-Cora(在Cora数据集上应用GAT做节点分类)
    RabbitMQ的广播模式(fanout)在(基于xml配置)项目中使用
    华为od德科面试数据算法解析 2022-7-21 火星文计算
    【短文】在Windows显示所有当前打开的连接和监听的端口
    买卖股票的最好时机(一、二)
    接口项目实战
    算法基础-数学知识-质数、约数
    yolov5训练自己的数据集时出现的问题
  • 原文地址:https://blog.csdn.net/Logan_addoil/article/details/128052393