• 【架构师技能篇】Worker初识


    一、Worker的真实身份

    Worker是线程池的内部类。包装了一个线程来不断的执行任务。线程池中的每个线程都被包装成了Worker类,它具有以下特点:

    1、它实现了Runnable,内部线程start后,执行的就是Worker本身的run方法。

    2、它继承了AQS,实现了一个不可重入的独占锁,执行任务期间加锁。线程池可以通过tryLock来判断Worker是不是正在执行任务。

    二、Worker的创建与销毁

    1、Worker的创建

    线程池在接收提交任务阶段,会根据当前线程数量,决定是否创建线程。如下代码:

    public void execute(Runnable command) {
           if (command == null)
               throw new NullPointerException();
           int c = ctl.get();
           if (workerCountOf(c) < corePoolSize) {
               if (addWorker(command, true))
                   return;
               c = ctl.get();
           }
           if (isRunning(c) && workQueue.offer(command)) {
               int recheck = ctl.get();
               if (! isRunning(recheck) && remove(command))
                   reject(command);
               else if (workerCountOf(recheck) == 0)
                   addWorker(null, false);
           }
           else if (!addWorker(command, false))
               reject(command);
       }

    通过上面的源代码,我们会发现有三个地方调用addWorker()方法,而此方法的内部则会创建Worker对象,并完成提交任务的执行。下面,来看看Worker的内部结构:

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
       
       private static final long serialVersionUID = 6138294804551838833L;

       /** 线程池里的真正的线程,负责执行提交的任务。*/
       final Thread thread;
       /**Worker线程执行的第一个任务,如果创建的是核心线程的Worker则提交的任务会赋值给此属性。此值可为空*/
       Runnable firstTask;
       /** 当前线程已完成的任务数,每次执行玩一个提交的任务,此值+1 */
       volatile long completedTasks;
    }  

    接下来,我们看下构造函数:

    Worker(Runnable firstTask) {
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
    }

    构造函数中,会把当前状态设置为-1,此状态来源于AbstractQueuedSynchronizer。表示锁是否被抢占,此处设置为-1,是为了让抢锁操作失败,不会导致刚刚创建就被中断,直到执行了runWorker。

    firstTask 为当前提交的任务

    thread  通过线程池构建时提供的线程工厂创建。此处需要注意,在newThread()时,把this当作传入参数,此时的this代表的是当前new出来的Worker对象。类似于new Thread(new Runnabel()),所以可以推断,当thread调用start方法时,会调用Worker的run()方法。

    了解完worker的内部结构,我们继续回看addWorker()方法,我们会从里面找到new Worker(),并且获取到Worker里的Thread,并启动线程。如下图:

    Worker w = null;
    try {
    // new 一个Worker对象,并将当前任务当作参数传给worker
       w = new Worker(firstTask);
       // 获取到worker里封装的Thread对象,即通过getThreadFactory().newThread(this)创建的Thread
       final Thread t = w.thread;
       if (t != null) {
           final ReentrantLock mainLock = this.mainLock;
           mainLock.lock();
           try {
               int rs = runStateOf(ctl.get());
               if (rs < SHUTDOWN ||
                   (rs == SHUTDOWN && firstTask == null)) {
                   if (t.isAlive()) // precheck that t is startable
                       throw new IllegalThreadStateException();
                   workers.add(w);
                   int s = workers.size();
                   if (s > largestPoolSize)
                       largestPoolSize = s;
                   workerAdded = true;
               }
           } finally {
               mainLock.unlock();
           }
           if (workerAdded) {
            // 启动当前worker里的线程,因为Thread里的Runnable是Worker对象,所以此处会执行Worker的run()方法
               t.start();
               workerStarted = true;
           }
       }
    } finally {
       if (! workerStarted)
           addWorkerFailed(w);
    }

    2、Worker的销毁

    Worker对象的生命周期随着内部属性thread线程的结束而结束。当线程结束了,会从ThreadPoolExecutor的Worker集合属性workers中删除,从而回收Worker对象。

    每个Worker都会从ThreadPoolExecutor的队列中获取任务(详见getTask()方法),当从队列中无法获取到任务时,根据是否是核心线程和allowCoreThreadTimeOut可以确定当前线程是否需要退出。如果线程需要退出,会执行processWorkerExit方法,如下:

    try {
       while (task != null || (task = getTask()) != null) {
           w.lock();
           if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() &&
                 runStateAtLeast(ctl.get(), STOP))) &&
               !wt.isInterrupted())
               wt.interrupt();
           // 省略执行获取到的任务代码
           .
           .
           .
       }
       completedAbruptly = false;
    } finally {
    // 当无法获取到任务,while循环退出,执行此方法
       processWorkerExit(w, completedAbruptly);
    }

    当Worker的processWorkerExit被调用,则表明一个线程退出了,第一个参数为当前线程的包装对象Worker,第二个参数代表是否突然、非正常、发生异常退出,如何理解都行。反正不是正常流程退出,就是发生了各种可能的意外。

    processWorkerExit详解如下:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是突然退出的,需要修改线程池ctl的值,保证ctl的值与真实存在的线程数相等
       if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
           decrementWorkerCount();

       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           completedTaskCount += w.completedTasks;
           // 从集合中删除封装当前线程的worker对象
           workers.remove(w);
       } finally {
           mainLock.unlock();
       }

       tryTerminate();

       int c = ctl.get();
       // 如果当前的ctl值小于STOP,即表示线程池当前的状态为RUNNING或者SHUTDOWN
       if (runStateLessThan(c, STOP)) {
        // 如果突然、非正常退出,此处不执行。否则需要保证线程池至少有一个核心线程
           if (!completedAbruptly) {
               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
               if (min == 0 && ! workQueue.isEmpty())
                   min = 1;
               if (workerCountOf(c) >= min)
                   return; // replacement not needed
           }
           // 当前线程异常突然退出了,需要重新启动一个线程。会重新创建一个Worker对象哦!
           addWorker(null, false);
       }
    }

    三、Worker获取并执行任务

    1、线程的重复利用

    线程池的一个很重要的优点就是通过重复利用已创建的线程,降低线程创建和销毁造成的资源消耗。那么,此处容易产生疑问,一个线程是如何重复利用的呢?答案就在Worker的获取并执行任务方法runWorker()里。接下来,我们一起探秘。

    首先,我们要确定一个知识点,通过上面的学习,我们知道线程池里的线程启动后,会执行Worker里的run()方法。

    在Worker的run()方法里只有一行代码,那就是调用runWorker(this),把Worker本身当作参数传入。下面,我们来看runWorker方法是如何保障线程重复利用的,如下图:

    final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
       w.unlock(); // allow interrupts
       boolean completedAbruptly = true;
       try {
        // 通过一个while循环,不断的调用getTask方法从线程池队列里获取提交的Runnabel任务
           while (task != null || (task = getTask()) != null) {
               // 省略代码,直接看重点
               .
               .
               .
               // 如果获取到了任务,直接调用run()方法。此处需要注意,调用run方法并不是启动线程,只是简单的接口调用!
               // task.run()方法里就是我们提交的任务的具体实现,以执行我们的业务逻辑。
               // 每执行完一个任务,就通过while循环继续从队列中获取下一个,依次执行run方法。
               task.run();
               
               // 省略后面的代码。。
               .
               .
               .
                 
           }
           completedAbruptly = false;
       } finally {
           processWorkerExit(w, completedAbruptly);
       }
    }

    通过上面的讲解,我们可以总结一句话:一个线程启动后,循环从线程池队列中获取提交的任务,并调用其run()方法。

    此方法保证了一个线程可以执行多个任务,从而达到线程的重复利用。

    2、获取提交任务

    前面,我们讲完了线程池里的线程是如何执行一个提交的任务的。那么,线程是如何获取我们提交的任务呢?答案藏在Worker类的getTask()方法里。接下来,我们一起探秘。

    private Runnable getTask() {
       boolean timedOut = false; // Did the last poll() time out?

       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);

           // 此处判断的关键在于rs >= STOP || workQueue.isEmpty()
           // 1、如果线程池为STOP状态,那么是不允许获取任务了,符合线程池STOP状态立马停止任务的执行
           // 2、如果线程池空了,那么也不允许获取任务了
           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 修改线程池ctl的值,线程数量减1
               decrementWorkerCount();
               return null;
           }

           int wc = workerCountOf(c);

           // 判断allowCoreThreadTimeOut,是否所有线程允许超时退出,或者当前线程数大于核心线程数,也代表线程可以退出
           // allowCoreThreadTimeOut默认为false;而wc > corePoolSize则表示线程里当前线程数是否已经超过了核心线程数
           // 如果已经超出,则代表创建了最大线程。而最大线程是可以被回收的。
           boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
     
           if ((wc > maximumPoolSize || (timed && timedOut))
               && (wc > 1 || workQueue.isEmpty())) {
               if (compareAndDecrementWorkerCount(c))
                   return null;
               continue;
           }

           try {
            // 获取任务的关键点:通过timed属性的值,以不同的方式从队列中获取任务。
            // 1、当timed为true,代表线程允许结束,那么等待指定时间后,还没有新任务提交则返回null,线程就退出了
            // 2、当timed为false,代表线程不允许结束,那么执行队列的take()方法,一直阻塞。直到有新任务提交
            //    通过一直阻塞,可以保证当前线程永远不退出。
            // 此处有一个经典面试题:线程池是如何保证核心线程不退出的。答案就在此处。
               Runnable r = timed ?
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   workQueue.take();
               if (r != null)
                   return r;
               timedOut = true;
           } catch (InterruptedException retry) {
               timedOut = false;
           }
       }
    }

  • 相关阅读:
    Mysql 5.7.X 小版本升级
    java-php-python-ssm-员工信息管理系统-计算机毕业设计
    如何把网站的http改成https?
    汇川使用笔记6:TCP服务端和客户端通讯
    Spring+SpringMVC+Mybatis的回顾
    GraphQL入门之使用ApolloServer构建GraphQL服务
    板凳----Linux/Unix 系统编程手册 25章 进程的终止
    Flink SQL: RESET Statements
    Spring 中更加简单的存储 (五大类注解 + @bean 注解) 和读取 (属性注入 + Setter 注入 + 构造方法注入) 对象
    【Linux】:常见指令理解(3)
  • 原文地址:https://blog.csdn.net/JACK_SUJAVA/article/details/127871336