假设最大核心数是2,非核心线程数为1,队列长度是3
来第一个任务的时候,没有工作线程在工作,需要创建一个
来第二个任务的时候,发现当前核心线程数小于最大核心线程数,所以继续创建线程来处理任务
当来第三个任务的时候,发现当前核心线程数已经等于最大核心线程数了,所以把新来的任务放到taskQueue中
后面来第四个、第五个任务也会放在taskQueue中
当来第六个任务的时候,发现taskQueue已经满了,所以会创建一个非核心线程来处理任务
当来第七个任务的时候,因为线程数量到最大限度了,taskQueue也满了,所以就会走拒绝策略,把其中一个任务给抛弃掉,具体抛弃哪个需要根据选择的拒绝策略来定。
创建线程这里需要考虑并发的问题,即多个任务同时过来了,需要串行创建线程,否则,可能会导致超卖的情况(即创建的线程超过了最大线程数),具体是通过CAS乐观锁实现,代码解释如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获取当前线程池的控制状态。
int rs = runStateOf(c);//获取当前线程池的运行状态。
// Check if queue empty only if necessary.
//下面这个条件判断用于判断线程池是否处于关闭状态,并且任务队列不为空。如果线程池的状态大于等于SHUTDOWN,并且不满足线程池状态为SHUTDOWN、首个任务为null且任务队列为空的条件,则返回false。这个判断是为了确保在线程池关闭时,不再添加新的工作线程。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//获取当前线程池中的工作线程数量。
//这个条件判断用于判断工作线程数量是否达到上限。如果工作线程数量大于等于CAPACITY(工作线程数量的上限)或者大于等于核心线程数(如果core为true)或最大线程数(如果core为false),则返回false。这个判断是为了确保工作线程数量不超过线程池的限制。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//尝试通过CAS(比较并交换)操作增加工作线程数量。如果成功增加工作线程数量,则跳出循环,继续执行后续逻辑。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新读取当前线程池的控制状态。
//再次判断线程池的运行状态是否发生了变化。如果运行状态发生了变化,则继续重试内部循环。这个判断是为了处理在CAS操作过程中,线程池的状态发生了变化的情况。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
...创建线程的逻辑忽略...
}
花开两朵,各表一枝。
上面说了任务来了之后,是怎么创建线程的,又是怎么暂存任务的。这一节介绍一下线程是怎么执行任务的,以及在不用的时候,线程怎么被销毁或者保活。
线程池创建线程并调了thread的start方法之后,该线程会走到下面的runWorker方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
当一个线程执行完就会自动退出,但是我们知道线程池中的核心线程会一直存活着,想要一直存活,不退出程序就可以了,即循环,从上面的代码也可以看出来确实是这样的。但是还有一个疑问,核心线程是一直存活的,但是非核心线程在一定情况是会销毁的,他们用的是一套代码逻辑,该怎么实现呢?关键点就在getTask这个方法中。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//销毁线程需要满足这两个条件:1. (允许核心线程销毁 || 线程数大于核心线程数)&& 达到了销毁时间;2. 任务队列中没有任务了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 这里返回了null,外层方法就跳出了while循环,从而结束该线程
return null;
continue;
}
try {
// 超时时间就是在这里设置的,如果允许超时销毁,那么就用poll进行拉取任务,超过了keepAliveTime就返回null。take是阻塞性等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从上面的解析可以看出,如果队列中没有任务时,小于核心数的线程(核心线程数不销毁的情况下)会一直阻塞在获取任务的方法,直到返回任务。(判断阻塞时并没有核心线程和非核心线程的概念,只要保证创建出来的线程销毁到符合预期数量就ok)。而且执行完后 会继续循环执行getTask的逻辑,不断的处理任务。
https://blog.csdn.net/lbh199466/article/details/102700780