• CachedThreadPool


    我们对CachedThreadPool可能会有以下疑问:

    1. CachedThreadPool是怎么做到创建无数个工作线程的?

    2. 为什么说CachedThreadPool可能导致导致cpu负载过高?

    3. corePoolSize和maximumPoolSize在CachedThreadPool怎样设置?

    4. keepAliveTime在CachedThreadPool怎样设置?

    5. 拒绝策略在什么条件下会生效,CachedThreadPool使用了哪种拒绝策略

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue());
    5. }

    总结:

    1. corePoolSize是0,maximumPoolSize是Integer.MAX_VALUE,根据我们之前对源码的分析,CachedThreadPool不会有核心线程,核心线程数的addworker方法不会执行,直接尝试加入queue中。

    2. keepAliveTime是60s,也就是60s从队列中没有拿到任务,worker就会自动销毁,销毁过程在fixed的分析中已经看过了

    3. queue使用的是SynchronousQueue,这个就是CachedThreadPool实现不停创建工作线程的关键,因为线程池处理任务分为三步:有任务过来先创建核心线程,当线程数达到核心线程数时则会进入等待队列,当等待队列满了则再创建非核心线程,直到线程数达到最大线程数后执行拒绝策略,所以可以猜测SynchronousQueue应该是没有容量的队列,放入一个offer操作必须有一个poll操作在等待,没有的话就执行入队失败,然后创建非核心线程进行处理

    4. 线程工厂和拒绝策略都没有指定,则使用的就是默认的,默认的拒绝策略是丢出一个异常

    SynchronousQueue

    CachedThreadPool使用了无参构造器,所以实际使用的是TransferStack,先进后出的方式。TransferStack无参构造器啥事情都没做。

    SynchronousQueue.offer(e)方法

    1. public boolean offer(E e) {
    2. if (e == null) throw new NullPointerException();
    3. return transferer.transfer(e, true, 0) != null;
    4. }

    实际上就是调用了TransferStack.transfer(e, true, 0)方法,注意参数:e=e,timed=true,nanos=0.

    offer(”张三“)

    1. if (h == null || h.mode == mode) { // empty or same-mode
    2. if (timed && nanos <= 0) { // can't wait
    3. if (h != null && h.isCancelled())
    4. casHead(h, h.next); // pop cancelled node
    5. else
    6. return null;
    7. } else if (casHead(h, s = snode(s, e, h, mode))) {
    8. SNode m = awaitFulfill(s, timed, nanos);
    9. if (m == s) { // wait was cancelled
    10. clean(s);
    11. return null;
    12. }
    13. if ((h = head) != null && h.next == s)
    14. casHead(h, s.next); // help s's fulfiller
    15. return (E) ((mode == REQUEST) ? m.item : s.item);
    16. }
    17. }

    此时h==null成立,且   if (timed && nanos <= 0) 这个if也会成立,所以直接就return null,offer方法就返回false入队失败,这样就进入了创建非核心线程的流程中了

    那什么时候会入队成功呢?

    根据我们之前的分析,woker线程在执行完firstTask之后会执行getTask方法从队列中获取任务,SynchronousQueue的keepAliveTime是60s,所以会执行poll方法

    可以看到SynchronousQueue的poll方法也是调用TransferStack.transfer(e, true, 0)方法,注意参数:e=null,timed=true,nanos=keepAliveTime.

    此时会进入else if分支,此时h=nuCachedThreadPoolll,mode=REQUEST,e=null,封装成SNode并把s设置成head。设置成功则会通过awaitFulfill方法来等待有其他线程放入任务

    awaitFulfill(SNode s, boolean timed, long nanos) 方法

    1. SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    2. final long deadline = timed ? System.nanoTime() + nanos : 0L;
    3. Thread w = Thread.currentThread();
    4. shouldSpin(s)判断需不需要自旋,如果当前节点是s或head为null或s还没有匹配到任务则需要,需要的话判断timed是不是true,那肯定是true,所以进入自旋
    5. spins=maxTimedSpins=32
    6. int spins = (shouldSpin(s) ?
    7. (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    8. for (;;) {
    9. if (w.isInterrupted())
    10. s.tryCancel();
    11. SNode m = s.match;
    12. 如果匹配到了任务则return匹配的节点m
    13. if (m != null)
    14. return m;
    15. 判断等待时间是否已到,到了则把match指针指向自己
    16. if (timed) {
    17. nanos = deadline - System.nanoTime();
    18. if (nanos <= 0L) {
    19. s.tryCancel();
    20. continue;
    21. }
    22. }
    23. 判断自旋次数,每次自旋都-1,直到shouldSpin条件不成立或到达自旋次数,如果已经自旋了 0)次还没有匹配到任务则parkNanos(this, nanos)
    24. if (spins > 0)
    25. spins = shouldSpin(s) ? (spins-1) : 0;
    26. else if (s.waiter == null)
    27. s.waiter = w; // establish waiter so can park next iter
    28. else if (!timed)
    29. LockSupport.park(this);
    30. else if (nanos > spinForTimeoutThreshold)
    31. LockSupport.parkNanos(this, nanos);

    所以当getTask()时会park当前线程等待keepAliveTime时间

    什么时候来唤醒getTask()线程呢?

    那肯定是有任务入队的时候,比如现在有线程来offer(”张三“)进入TransferStack.transfer(e, true, 0)方法,注意参数:e=e,timed=true,nanos=0.

    把当前节点封装成s,然后设置成head,获取s的next节点,实际就是之前那个park节点,然后使用tryMatch方法进行cas设置park节点match变量。设置成功则unpark之前那个REQUEST节点

     

    总结:

    1. 为什么说CachedThreadPool是一个没缓存的线程池呢,因为CachedThreadPool的核心线程数是0,所以有线程来offer元素时都会先尝试放入queue中,但CachedThreadPool使用的queue在没有其他线程在等待poll时是会入队失败的,此时就会起新的线程来执行任务,所以不会有任何任务会放入queue中进行等待,offer元素时不是之前的线程已经执行完任务从queue中poll任务就是直接起新的工作线程执行任务,所以呢CachedThreadPool不会有延迟现象产生,缺点是任务时间执行过长可能导致启动的工作线程过多导致cpu飙升

    2. SynchronousQueue主要通过offer和poll两个方法来保证queue中不保存任务,offer的时候会判断由node组成的单链表有没有poll线程在等待,如果没有线程在等待获取任务则会失败,而poll线程则会先封装成node组成单链表,然后自旋等待offer任务,如果期间没有任务则会park进行睡眠,直到有offer任务出现,当有offer任务出现时会先把自己封装成node节点,然后跟next节点进行匹对,也就是匹对poll线程,匹对成功则弹出这两个节点,完成任务的承接,这期间都是无锁操作,使用了大量cas。

    公众号同名,欢迎关注 

  • 相关阅读:
    ArrayList线程不安全解决办法
    在Spring中,标签管理的Bean中,为什么使用@Autowired自动装配修饰引用类(前提条件该引用类也是标签管理的Bean)
    模拟散列表—哈希表—拉链法;
    Flutter 使用Screen保持屏幕常亮不息屏
    高可用k8s集群(k8s-1.29.2)
    星辰天合联合星环科技完成互认证 共同打造更有生命力的大数据存算解决方案
    【解决方案】Ubuntu20.04 LTS CUDA已经安装但nvcc -V显示command not found
    在https页面,通过iframe实现http跨域访问(解决iframe页面点击浏览器刷新按钮后返回首页问题)
    GO语言gin框架实战-03-swagger和接口文档
    混合精子群优化和万有引力搜索算法 (HSSOGSA)(Matlab完整代码实现)
  • 原文地址:https://blog.csdn.net/shidebin/article/details/126818585