我们对CachedThreadPool可能会有以下疑问:
CachedThreadPool是怎么做到创建无数个工作线程的?
为什么说CachedThreadPool可能导致导致cpu负载过高?
corePoolSize和maximumPoolSize在CachedThreadPool怎样设置?
keepAliveTime在CachedThreadPool怎样设置?
拒绝策略在什么条件下会生效,CachedThreadPool使用了哪种拒绝策略
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue
()); - }
-
总结:
corePoolSize是0,maximumPoolSize是Integer.MAX_VALUE,根据我们之前对源码的分析,CachedThreadPool不会有核心线程,核心线程数的addworker方法不会执行,直接尝试加入queue中。
keepAliveTime是60s,也就是60s从队列中没有拿到任务,worker就会自动销毁,销毁过程在fixed的分析中已经看过了
queue使用的是SynchronousQueue,这个就是CachedThreadPool实现不停创建工作线程的关键,因为线程池处理任务分为三步:有任务过来先创建核心线程,当线程数达到核心线程数时则会进入等待队列,当等待队列满了则再创建非核心线程,直到线程数达到最大线程数后执行拒绝策略,所以可以猜测SynchronousQueue应该是没有容量的队列,放入一个offer操作必须有一个poll操作在等待,没有的话就执行入队失败,然后创建非核心线程进行处理
线程工厂和拒绝策略都没有指定,则使用的就是默认的,默认的拒绝策略是丢出一个异常
CachedThreadPool使用了无参构造器,所以实际使用的是TransferStack,先进后出的方式。TransferStack无参构造器啥事情都没做。
- public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- return transferer.transfer(e, true, 0) != null;
- }
实际上就是调用了TransferStack.transfer(e, true, 0)方法,注意参数:e=e,timed=true,nanos=0.
- if (h == null || h.mode == mode) { // empty or same-mode
- if (timed && nanos <= 0) { // can't wait
- if (h != null && h.isCancelled())
- casHead(h, h.next); // pop cancelled node
- else
- return null;
- } else if (casHead(h, s = snode(s, e, h, mode))) {
- SNode m = awaitFulfill(s, timed, nanos);
- if (m == s) { // wait was cancelled
- clean(s);
- return null;
- }
- if ((h = head) != null && h.next == s)
- casHead(h, s.next); // help s's fulfiller
- return (E) ((mode == REQUEST) ? m.item : s.item);
- }
- }
-
此时h==null成立,且 if (timed && nanos <= 0) 这个if也会成立,所以直接就return null,offer方法就返回false入队失败,这样就进入了创建非核心线程的流程中了
根据我们之前的分析,woker线程在执行完firstTask之后会执行getTask方法从队列中获取任务,SynchronousQueue的keepAliveTime是60s,所以会执行poll方法
可以看到SynchronousQueue的poll方法也是调用TransferStack.transfer(e, true, 0)方法,注意参数:e=null,timed=true,nanos=keepAliveTime.
此时会进入else if分支,此时h=nuCachedThreadPoolll,mode=REQUEST,e=null,封装成SNode并把s设置成head。设置成功则会通过awaitFulfill方法来等待有其他线程放入任务
- SNode awaitFulfill(SNode s, boolean timed, long nanos) {
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- Thread w = Thread.currentThread();
- shouldSpin(s)判断需不需要自旋,如果当前节点是s或head为null或s还没有匹配到任务则需要,需要的话判断timed是不是true,那肯定是true,所以进入自旋
- spins=maxTimedSpins=32;
- int spins = (shouldSpin(s) ?
- (timed ? maxTimedSpins : maxUntimedSpins) : 0);
- for (;;) {
- if (w.isInterrupted())
- s.tryCancel();
- SNode m = s.match;
- 如果匹配到了任务则return匹配的节点m
- if (m != null)
- return m;
- 判断等待时间是否已到,到了则把match指针指向自己
- if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- s.tryCancel();
- continue;
- }
- }
- 判断自旋次数,每次自旋都-1,直到shouldSpin条件不成立或到达自旋次数,如果已经自旋了 0)次还没有匹配到任务则parkNanos(this, nanos)
- if (spins > 0)
- spins = shouldSpin(s) ? (spins-1) : 0;
- else if (s.waiter == null)
- s.waiter = w; // establish waiter so can park next iter
- else if (!timed)
- LockSupport.park(this);
- else if (nanos > spinForTimeoutThreshold)
- LockSupport.parkNanos(this, nanos);
所以当getTask()时会park当前线程等待keepAliveTime时间
那肯定是有任务入队的时候,比如现在有线程来offer(”张三“)进入TransferStack.transfer(e, true, 0)方法,注意参数:e=e,timed=true,nanos=0.
把当前节点封装成s,然后设置成head,获取s的next节点,实际就是之前那个park节点,然后使用tryMatch方法进行cas设置park节点match变量。设置成功则unpark之前那个REQUEST节点
总结:
为什么说CachedThreadPool是一个没缓存的线程池呢,因为CachedThreadPool的核心线程数是0,所以有线程来offer元素时都会先尝试放入queue中,但CachedThreadPool使用的queue在没有其他线程在等待poll时是会入队失败的,此时就会起新的线程来执行任务,所以不会有任何任务会放入queue中进行等待,offer元素时不是之前的线程已经执行完任务从queue中poll任务就是直接起新的工作线程执行任务,所以呢CachedThreadPool不会有延迟现象产生,缺点是任务时间执行过长可能导致启动的工作线程过多导致cpu飙升
SynchronousQueue主要通过offer和poll两个方法来保证queue中不保存任务,offer的时候会判断由node组成的单链表有没有poll线程在等待,如果没有线程在等待获取任务则会失败,而poll线程则会先封装成node组成单链表,然后自旋等待offer任务,如果期间没有任务则会park进行睡眠,直到有offer任务出现,当有offer任务出现时会先把自己封装成node节点,然后跟next节点进行匹对,也就是匹对poll线程,匹对成功则弹出这两个节点,完成任务的承接,这期间都是无锁操作,使用了大量cas。