我们对FixedThreadPool可能会有以下疑问:
FixedThreadPool是怎么做到起固定线程数量的?
为什么说FixedThreadPool可能导致导致内存溢出?
corePoolSize和maximumPoolSize是如何起作用的?
keepAliveTime对哪些线程起作用,是如何进行检查的?
拒绝策略在什么条件下会生效,FixedThreadPool使用了哪种拒绝策略
- publicstatic ExecutorService newFixedThreadPool(int nThreads){
- returnnewThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
()); - }
- publicThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue){
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
-
- publicThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler){
- if(corePoolSize <0||
- maximumPoolSize <=0||
- maximumPoolSize < corePoolSize ||
- keepAliveTime <0)
- thrownewIllegalArgumentException();
- if(workQueue ==null|| threadFactory ==null|| handler ==null)
- thrownewNullPointerException();
- this.acc = System.getSecurityManager()==null ?
- null:
- AccessController.getContext();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
-
newFixedThreadPool实际上返回的是ThreadPoolExecutor,ThreadPoolExecutor实际就是线程池实现类,使用了典型的模板方法设计模式,通过ThreadPoolExecutor构造器的说明我们稍微解释一下各参数的意思:
corePoolSize 核心线程数,线程池一直存在的线程(即使这些线程是空闲的),除非你设置allowCoreThreadTimeOut
maximumPoolSize 线程池中允许存在的最多的线程数,当workQueue满了之后会创建线程直到数量达到maximumPoolSize
keepAliveTime 当线程池中的线程数超过核心线程数,超过部分的线程可以空闲keepAliveTime时间,如果这段时间还没有任务过来则超过部分线程就会销毁
unit keepAliveTime的单位
workQueue 等待队列,当corePoolSize线程都在处理任务时,新进入线程池的任务则翻入等待下队列
threadFactory 创建线程池中线程的线程工厂
handler 拒绝策略 当线程池已达到最大线程数和等待队列也已经满了,则使用拒绝策略
newFixedThreadPool使用了corePoolSize和maximumPoolSize相等,所以当线程池线程数到达corePoolSize之后就不会再创建线程,这就解答了我们第一个疑问
newFixedThreadPool使用了LinkedBlockingQueue,而且使用的是默认构造器,容量是Integer.MAX_VALUE,之前我们讲过LinkedBlockingQueue是一个有界阻塞队列,但如果使用无参构造器则是一个无界的了,Integer.MAX_VALUE是一个非常大的数,没达到这个数已经内存溢出了。所以我们知道了第二个疑问的答案
newFixedThreadPool的keepAliveTime是0,实际上设不设置都没关系,因为不会超过核心线程数
newFixedThreadPool的threadFactory和handler都使用了默认的,threadFactory是DefaultThreadFactory,handler是AbortPolicy,也就是丢一个异常,实际上也没什么用,因为不会到达workQueue的大小。这就回答了第五问题:拒绝策略是丢一个异常,但不会生效
- public void execute(Runnable command){
- if(command ==null)
- thrownewNullPointerException();
- int c = ctl.get();
- 获取工作线程的数量,如果工作线程数量小于核心线程数则执行addWorker方法增加核心工作线程,增加成功则执行return,失败则再次获取c的值
- if(workerCountOf(c)< corePoolSize){
- if(addWorker(command,true))
- return;
- c = ctl.get();
- }
- 判断线程池是否运行,运行则把任务压入等待队列中,放入成功之后再次获取ctl的值,如果此时线程池终止且删除等待队列中当前任务成功则调用handler拒绝任务,否则判断当前线程池工作线程是否为0,是则起一个空闲的工作线程
- if(isRunning(c)&& workQueue.offer(command)){
- int recheck = ctl.get();
- if(!isRunning(recheck)&&remove(command))
- reject(command);
- elseif(workerCountOf(recheck)==0)
- addWorker(null,false);
- }
- 放入等待队列失败则会尝试起非核心工作线程来处理任务,没成功就会执行handler拒绝策略
- elseif(!addWorker(command,false))
- reject(command);
- }
-
总结:
先会获取ctl的值,ctl是一个AtomicInteger,这里又是使用了一个Integer来保存两个变量值:Integer的高三位代表线程的状态(RUNNING:-536870912(也就是第三十位是-1),SHUTDOWN:0,STOP:536870912(也就是第三十位是1),TIDYING:1073741824(也就是低三十一位是1),TERMINATED:1610612736(也就是第三十一、第三十二是位1)),这种用一个变量表示两个状态的做法在读写锁中也出现过,在程序理解上有点难度,这样设计感觉除了装逼没太大好处
获取工作线程的数量,如果工作线程数量小于核心线程数则执行addWorker方法增加核心工作线程,增加成功则执行return,失败则再次获取c的值
判断线程池是否运行,运行则把任务压入等待队列中,放入成功之后再次获取ctl的值,如果此时线程池终止且删除等待队列中当前任务成功则调用handler拒绝任务,否则判断当前线程池工作线程是否为0,是则起一个空闲的工作线程
放入等待队列失败则会尝试起非核心工作线程来处理任务,没成功就会执行handler拒绝策略
最后回到FixedThreadPool各参数上来,此时核心线程数和最大线程数是一样的,当有任务过来时先会起工作线程进行处理,如果达到核心线程数之后则会放入LinkedBlockingQueue中,由于此时LinkedBlockingQueue的容量时Integer.MAX_VALUE,所以正常来说都队列都不会满而且都不会阻塞,所以他的起非核心线程数和拒绝策略是不会执行的
- private boolean addWorker(Runnable firstTask, boolean core) {
- 这个for循环实际上就是使用cas给ctl变量中运行线程数+1
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
-
- for (;;) {
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
-
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- 创建新的工作线程
- w = new Worker(firstTask);
- 拿到工作线程中thread变量
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- 用ThreadExecutor中的重入锁上锁
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
-
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- thread在调用start之前isAlive是false,如果是true说明已经start过了,所以这里报错
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- 放入ThreadExecutor的hashset的workers中,这里用了锁所以用线程不安全的hashset没关系
- workers.add(w);
- int s = workers.size();
- 记录largestPoolSize:线程池里面到达过的最大线程数,当我们需要知道线程池运行的一些情况时会用到
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- 工作线程添加成功则启动线程
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- start线程失败则做一些失败处理
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
-
首先自旋用cas给ctl变量中运行线程数+1,如果不满足+1的条件则直接return false
使用new Worker(firstTask)创建了一个工作线程,这里设计的比较巧妙:Worker实现了AQS继承于Runnable接口,有成员变量thread(该线程持有了当前Worker对象,因为Worker继承于Runnable接口,所以在调用thread的start方法时就会调用worker的run方法),firstTask当前work线程在执行的任务(没有任务的可能为null),completedTasks当前工作线程完成的任务数,并把AQS中的state设置为-1;
把线程放入工作线程池workers是加锁的,因为workers是HashSet,HashSet是一个线程不安全的集合,但他可以自动去重,所以此处需要加锁
加入workers成功之后就start了worker中thread,此时就会进入worker的run方法
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- 先释放锁,因为在new worker时把state设置成了-1,而在下面lock时是要从0设置成1,所以这里要先unlock,至于为什么要在add worker时设置成-1,而不直接用0,目前不是很清楚
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- 当worker线程的task不为null或则执行getTask()不为null时会一直执行while循环
- while (task != null || (task = getTask()) != null) {
- 上锁
- w.lock();
- 异常状态则中断当前线程
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- 空方法待子类扩展
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- 任务执行
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- 空方法待子类扩展
- afterExecute(task, thrown);
- }
- } finally {
- 最后把tesk设置为null,completedTasks++和释放锁
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- 如果跳出while循环了则把completedAbruptly设置成false
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
总结:
work实际是通过work自身的成员变量thread.start方法来启动runWorker方法
worker本身是通过while循环来不断执行任务,首先会先判断成员变量firstTask有没有任务,有则先执行,没有则通过getTask()从等待队列中获取任务,如果还是没有则跳出while循环,然后从workers工作线程池剔除当前work
到目前为止我们还没看到keepAliveTime起作用的代码,我想可能藏在了getTask()方法里
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
-
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
-
- int wc = workerCountOf(c);
-
- // Are workers subject to culling?
- 如果allowCoreThreadTimeOut这个参数为true(allowCoreThreadTimeOut是说核心线程数是否能过期,正常是false),或当前工作线程数大于corePoolSize,则timed为true,否则为false,也就说如果允许核心线程数过期或当前工作线程数大于核心线程数,则有时间限制
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- 当前线程数大于最大线程数或(有时间限制且已超时)且(工作线程数大于1个或队列为空)则把当前工作线程数-1并返回null
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
-
- try {
- 如果是有时间限制则poll等待keepAliveTime时间,否则使用take。
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- 拿到任务则返回任务,没有说明超时则timedOut为true
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
-
首先判断有没有超时限制(timed),条件是allowCoreThreadTimeOut参数和当前工作线程数是否大于核心线程数
有条件限制的会使用poll(keepAliveTime, TimeUnit.NANOSECONDS),没有的使用take(一直等待),所以keepAliveTime实际就是从队列拿任务的等待时长,其实也好理解,当不从等待队列中拿任务时,说明work肯定在执行firstTask,只有firstTask执行完才会执行getTask()拿等待队列中的任务,所以work的空闲时长就是等待从等待队列中拿任务的时长。所以用keepAliveTime等待拿任务就巧妙地等于work空闲的时长了
当有时间限制且已超时,就判断当前线程数是否大于1或等队列是否为空,满足一个条件就可以把工作线程数-1并返回null
这个时候就会跳出runWorker方法的while循环,然后processWorkerExit方法会把work从works中remove。整个run方法就会执行完,线程就自动退出。