为什么要使用线程池?
减少开销,便于管理。
为了可以更清晰的去分析线程池的源码,核心属性必须掌握
ctl属性,维护这线程池状态以及工作线程个数。
- //线程池的核心属性
- // 核心属性就是ctl,如果不认识AtomicInteger,就把ctl当成int看
- // ctl一个int类型表示了线程池的2大核心内容
- // 第一个:线程池的状态
- // 第二个:工作线程个数
- // ctl是int类型,而int是32个bit位组成的数值
- // 高3位记录:线程池的状态,低29位记录:工作线程个数
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
-
- // 这个29就是为了方便对int类型数值进程分割。
- private static final int COUNT_BITS = 29;
- // 工作线程最大个数
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- 00000000 00000000 00000000 00000001
- 00100000 00000000 00000000 00000000
- 00011111 11111111 11111111 11111111
- // 高3位记录的线程池状态
- // RUNNING:属于正常状态,可以正常接收任务,并且处理任务
- private static final int RUNNING = -1 << COUNT_BITS;
- // SHUTDOWN:处理关闭状态,不接收新任务,但是会处理完线程池中之前接收到的任务
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- // STOP:处理停止状态,不接收新任务,中断正在执行的任务,之前接收的任务也不去处理
- private static final int STOP = 1 << COUNT_BITS;
- // TIDYING:过渡状态,是从SHUTDOWN或者STOP转换过来的,工作线程都凉凉了,任务也处理完了(了解)
- private static final int TIDYING = 2 << COUNT_BITS;
- // TERMINATED:终止状态,从TIDYING执行了terminated方法,转换到TERMINATED(了解)
- private static final int TERMINATED = 3 << COUNT_BITS;
-
- // 线程池最大允许有多少个工作线程?
JUC包下的Executors提供了几种JDK自带的线程池构建方式,可以不用手动去构建。
but,不推荐使用上述方式去构建线程池。
为了让咱们可以更好的去控制线程池对象,推荐直接采用new的方式去构建ThreadPoolExecutor
为了去手动new,就要对ThreadPoolExecutor对象的有参构造参数有掌握
- // 线程池的7个参数
- // 核心线程数,默认不会被干掉
- public ThreadPoolExecutor(int corePoolSize,
- // 最大线程数,在核心线程数之外的非核心线程个数,
- // maximumPoolSize - corePoolSize = 非核心线程个数
- int maximumPoolSize,
- // 最大空闲时间(一般针对非核心线程)
- long keepAliveTime,
- // 时间单位(一般针对非核心线程)
- TimeUnit unit,
- // 工作队列,核心线程个数满足corePoolSize,再来任务,就扔到工作队列
- BlockingQueue
workQueue, - // 线程工厂,为了更方面后期出现故障时,定位问题,在构建线程时,指定好
- // 一些细节信息,给个名字~~
- ThreadFactory threadFactory,
- // 拒绝策略,核心满了,队列满了,非核心到位了,再来任务走拒绝策略
- RejectedExecutionHandler handler) {
- // 省略部分代码
- }
- // 在线程池中,核心线程和非核心线程都属于工作线程。
- new ThreadPoolExecutor(5,7,1,TimeUnit.SECOND,new ArrayBlockingQueue<>(10),线程工厂,拒绝策略)
要将任务提交给线程池执行时,需要调用线程池的execute方法,将任务传递进去
- // 线程池执行任务的核心方法
- // 线程池的执行流程图就是基于这个方法画出来的。
- public void execute(Runnable command) {
- // 非空判断~
- if (command == null) throw new NullPointerException();
- // 获取ctl属性,命名c
- int c = ctl.get();
- // =======================创建核心线程===============================
- // workerCountOf(c):获取工作线程的个数
- // 如果工作线程数 小于 核心线程数
- if (workerCountOf(c) < corePoolSize) {
- // 通过addWorker方法创建 核心线程。
- // command是传递的任务,true代表核心线程
- // 如果创建工作线程成功:返回true
- // 如果创建工作线程失败:返回false
- if (addWorker(command, true))
- // 告辞,任务交给线程执行了
- return;
- // 到这,说明失败了,走下一流程
- c = ctl.get();
- }
- // =======================将任务添加到工作队列===============================
- // 判断线程池状态是不是RUNNING
- // 只有状态正常,才会走offer方法(添加任务到工作队列)
- // offer方法,添加成功返回true,添加失败返回false
- if (isRunning(c) && workQueue.offer(command)) {
- // 任务已经放到工作队列了。
- // 任务放到队列后,要重新检查一次,重新拿到ctl
- int recheck = ctl.get();
- // 判断线程池状态是不是RUNNING
- // 如果状态不是RUNNING,将刚刚放进来的任务从队列移除~~~
- if (!isRunning(recheck) && remove(command))
- // 执行拒绝策略。
- reject(command);
- // 如果状态是RUNNING,同时工作线程数是0个。
- else if (workerCountOf(recheck) == 0)
- // 如果没有工作线程,添加一个非核心,空任务线程去解决掉工作队列没人处理的任务
- addWorker(null, false);
- }
- // =======================创建非核心线程===============================
- // addWorker创建非核心线程,如果成功,返回true
- // 如果添加非核心线程失败,执行reject方法
- else if (!addWorker(command, false))
- // =======================拒绝策略===============================
- // 执行拒绝策略
- reject(command);
- }
前面就提到了添加工作线程,查看addWorker的源码
- // 添加工作线程的方式
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 判断是否可以正常添加工作线程
- retry:
- for (;;) {
- //=======================判断线程池状态===================================
- // 获取ctl属性
- int c = ctl.get();
- // 获取线程状态
- int rs = runStateOf(c);
-
- // 如果线程池状态不是RUNNING(正常到这,就不能添加工作线程了)
- if (rs >= SHUTDOWN &&
- // 可能出现工作队列有任务,但是线程池没有工作线程
- // 需要添加一个非核心空任务的工作线程,这里就是。
- // 只要一个不满足,就打破了之前效果,不需要添加
- !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
- // 当前状态不允许添加工作线程,返回false
- return false;
-
- for (;;) {
- //=======================判断工作线程个数===================================
- // 阿巴阿巴~~
- int wc = workerCountOf(c);
- // 如果工作线程个数,大于线程池允许的工作线程最大数,满足,告辞,不能添加
- if (wc >= CAPACITY ||
- // core:true添加核心,false添加非核心
- // 根据不同情况,比较不同的参数
- // 如果线程数不满足new线程池时的参数,告辞,不能添加
- wc >= (core ? corePoolSize : maximumPoolSize))
- // 不能添加。
- return false;
- // 没进到if,可以添加工作线程
- // 基于CAS的方式,对ctl进行 + 1操作
- // 线程A,线程B
- if (compareAndIncrementWorkerCount(c))
- // 如果CAS成功,直接跳出外层循环,执行添加工作线程,并且启动工作线程
- break retry;
- // 重新获取ctl
- c = ctl.get();
- // 如果线程池状态不变,直接正常重新执行内部循环
- if (runStateOf(c) != rs)
- // 如果线程池状态变了,重新判断线程池状态,走外部循环。
- continue retry;
- }
- }
-
-
-
-
- // 真正的去添加工作线程,并且启动工作线程
- // 声明了三个标识
- // workerStarted:工作线程启动了吗?
- boolean workerStarted = false;
- // workerAdded:工作线程创建了吗?
- boolean workerAdded = false;
- // Worker:工作线程
- Worker w = null;
- try {
- // 创建工作线程
- w = new Worker(firstTask);
- // 获取到工作线程中的Thread
- final Thread t = w.thread;
- // 这个if几乎不会发生,除非你写ThreadFactory有问题,或者是业务没通过
- if (t != null) {
- try {
- // 拿到线程池状态
- int rs = runStateOf(ctl.get());
- // rs < SHUTDOWN:满足,代表线程池状态ok是RUNNING
- if (rs < SHUTDOWN ||
- // 状态是SHUTDOWN,任务是空,阿巴阿巴~~~
- (rs == SHUTDOWN && firstTask == null)) {
- // 除非你写ThreadFactory有问题,或者是业务没通过,线程运行了,这里就扔异常
- if (t.isAlive())
- throw new IllegalThreadStateException();
- // 添加Worker工作线程到workers。
- // private final HashSet
workers = new HashSet(); - workers.add(w);
- // 在记录工作线程的历史最大值
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- // 工作线程创建了!!!
- workerAdded = true;
- }
- }
- // 添加工作线程成功,那就启动线程
- if (workerAdded) {
- t.start();
- // 工作线程启动了!!!
- workerStarted = true;
- }
- }
- } finally {
- // 工作线程启动失败
- if (!workerStarted)
- // 将Worker从HashSet移除,并且对ctl - 1
- addWorkerFailed(w);
- }
- return workerStarted;
- }
runWorker其实就是启动工作线程做的事。
核心就是Worker对象在调用了start方法后,会执行Worker类中的run方法。
- // 工作线程干活的方法
- final void runWorker(Worker w) {
- // 获取当前线程
- Thread wt = Thread.currentThread();
- // 第一次从Worker中拿任务,赋值给task属性
- Runnable task = w.firstTask;
- // 将Worker中的任务置位空
- w.firstTask = null;
- try {
- // task != null:说明任务是最开始addWorker传递过来的。
- // (task = getTask()) != null:说明任务是从工作队列中获取到的
- while (task != null || (task = getTask()) != null) {
- // 删掉了部分catch~~~
- try {
- // 前置增强(勾子函数)
- beforeExecute(wt, task);
- try {
- // 执行任务
- task.run();
- } finally {
- // 后置增强(勾子函数)
- afterExecute(task, thrown);
- }
- } finally {
- // 任务值为空
- task = null;
- // 记录当前Worker完成了一个任务
- w.completedTasks++;
- }
- }
- }
- }
Worker工作线程除了addWorker自带的任务之外,就是从工作队列获取
- // 工作线程从工作队列获取任务
- private Runnable getTask() {
- // 超时了咩? 默认没!
- boolean timedOut = false;
- // 死循环。
- for (;;) {
- // 拿到ctl,获取线程池状态
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // 第一个:线程池状态是SHUTDOWN,并且阻塞队列为空
- // 第二个:线程池是STOP状态
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- // ctl - 1,当前线程可以销毁了~
- decrementWorkerCount();
- return null;
- }
-
- // 线程池状态正常。
-
- // 重新获取工作线程个数
- int wc = workerCountOf(c);
-
- // 回头看!
- // allowCoreThreadTimeOut:默认为false,如果为true,核心线程也要超时
- // wc > corePoolSize:工作线程大于核心线程数(现在有非核心线程)
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- // 如果工作线程大于最大线程数,基本不会满足,就是false
- // (timed && timedOut):true代表之前走了poll,但是没拿到任务
- if ((wc > maximumPoolSize || (timed && timedOut))
- // 工作线程至少2个
- // 工作队列为空
- && (wc > 1 || workQueue.isEmpty())) {
- // 干掉一个非核心线程。(循环结束拿不到任务即可)
- // CAS对ctl - 1,销毁线程
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
-
-
- try {
- // 基于阻塞队列的poll或者是take获取任务
- // poll可以指定等待多久拿任务(非核心线程)
- // take死等任务。(核心线程)
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- // 如果拿到任务,正常返回
- if (r != null)
- return r;
- // 没拿到任务,timeOut是true
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
这个没有固定的公式,虽然很多书上有设置公式,但是不一定适合你的业务。
如果要搞一个合适的线程池参数设置,你需要去动态的监控线程池,并且可以动态修改。
只能根据细粒度的测试慢慢调试一个比较合适的参数。
线程池提供了核心属性的get方法,还有核心参数动态set的功能。