• Java线程池源码解析


    1. 线程池的继承关系

    从上图可以看出来最顶层的接口为 Executor ,下面看一下这个接口中的方法

    1. public interface Executor {
    2. //只有一个方法execute
    3. void execute(Runnable command);
    4. }

    从代码中可以看出来只有一个 execute 方法。这也是我常用的一个来运行 Runable 的一种方式。然后看一下继承了 Executor 接口的 ExecutorService 接口中有哪些我们熟悉的而常用的方法

    1. public interface ExecutorService extends Executor {
    2.    // 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
    3.    //写例子的时候用到(PS在实际的项目组基本上没有用到,反正我是没有)
    4.    void shutdown();
    5.    //关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
    6.     //这个也是基本上没用到
    7.    List<Runnable> shutdownNow();
    8.    // 线程池是否已关闭
    9.   // 还是没有用到
    10.    boolean isShutdown();
    11.    // 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
    12.    //尴尬没用过
    13.    boolean isTerminated();
    14.    //一脸懵逼没用过
    15.    boolean awaitTermination(long timeout, TimeUnit unit)
    16.        throws InterruptedException;
    17.   //带返回值的
    18.    <T> Future<T> submit(Callable<T> task);
    19.    //带返回值的 -- 这个很少用
    20.    <T> Future<T> submit(Runnable task, T result);
    21.    //带返回值---(成功返回值为null 有兴趣的可以去尝试一下,源码的英文注释上面有说明)
    22.    Future<?> submit(Runnable task);
    23.    //批量全部执行
    24.    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    25.        throws InterruptedException;
    26.    //批量全部执行--在规定的时间内
    27.    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    28.                                  long timeout, TimeUnit unit)
    29.        throws InterruptedException;
    30.   //任意一个先执行完就返回
    31.    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    32.        throws InterruptedException, ExecutionException;
    33.    //任意一个先执行完就返回
    34.    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    35.                    long timeout, TimeUnit unit)
    36.        throws InterruptedException, ExecutionException, TimeoutException;
    37. }

    演示代码:

    1. public class InvokeAllTest {
    2.    public static void main(String[] args) throws  Exception{
    3.        ExecutorService service = Executors.newFixedThreadPool(10);
    4.        Collection<Test> a = new ArrayList<>();
    5.        for(int i = 0; i < 10; ++i){
    6.            a.add(new Test());
    7.       }
    8.        //System.out.println( service.invokeAny(a));
    9.        System.out.println( service.invokeAll(a));
    10.   }
    11. }
    12. class Test implements Callable<String>{
    13.    /**
    14.     * Computes a result, or throws an exception if unable to do so.
    15.     *
    16.     * @return computed result
    17.     * @throws Exception if unable to compute a result
    18.     */
    19.    @Override
    20.    public String call() throws Exception {
    21.        TimeUnit.SECONDS.sleep((int)(Math.random()*10));
    22.        return Thread.currentThread().getName();
    23.   }
    24. }

    看一下最后一个接口 ScheduledExecutorService 计划执行接口,从命名上就不难看出来这个用于执行任务的。

    1. public interface ScheduledExecutorService extends ExecutorService {
    2.    /**
    3.     * 创建并执行在给定延迟之后启用的一次性操作。
    4.     */
    5.    public ScheduledFuture<?> schedule(Runnable command,
    6.                                       long delay, TimeUnit unit);
    7.    /**
    8.     * 创建并执行在给定延迟之后启用的一次性操作。返回ScheduledFuture<V>
    9.     */
    10.    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
    11.                                           long delay, TimeUnit unit);
    12.    /**
    13.     * 按指定频率周期执行某个任务。
    14.     */
    15.    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    16.                                                  long initialDelay,
    17.                                                  long period,
    18.                                                  TimeUnit unit);
    19.    /**
    20.     * 按指定频率间隔执行某个任务。
    21.     */
    22.    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    23.                                                     long initialDelay,
    24.                                                     long delay,
    25.                                                     TimeUnit unit);
    26. }

    另外,由于线程池支持获取线程执行的结果,所以,引入了 Future 接口,RunnableFuture 继承自此接口,然后我们最需要关心的就是它的实现类 FutureTask。到这里,记住这个概念,在线程池的使用过程中,我们是往线程池提交任务(task),使用过线程池的都知道,我们提交的每个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,然后再提交到线程池。这样,读者才能比较容易记住 FutureTask 这个类名:它首先是一个任务(Task),然后具有 Future 接口的语义,即可以在将来(Future)得到执行的结果。

    2. AbstractExecutorService

    接着来看一下在抽象类 AbstractExecutorService 实现了哪些方法

    1. public abstract class AbstractExecutorService implements ExecutorService {
    2.    /**
    3.     * Runnable 转换为 Callable 的方法带指定返回值
    4.     */
    5.    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    6.        return new FutureTask<T>(runnable, value);
    7.   }
    8.    /**
    9.     * Runnable 转换为 Callable 的方法,不带指定返回值
    10.     */
    11.    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    12.        return new FutureTask<T>(callable);
    13.   }
    14.    /**
    15.     *
    16.     */
    17.    public Future<?> submit(Runnable task) {
    18.        if (task == null) throw new NullPointerException();
    19.        //这里看一看出来在Runnable submit方法返回值为Future get的值为null
    20.        RunnableFuture<Void> ftask = newTaskFor(task, null);
    21.        execute(ftask);
    22.        return ftask;
    23.   }
    24.    public <T> Future<T> submit(Runnable task, T result) {
    25.        if (task == null) throw new NullPointerException();
    26.         //这里看一看出来在Runnable submit方法返回值为Future get的值为result
    27.        RunnableFuture<T> ftask = newTaskFor(task, result);
    28.        execute(ftask);
    29.        return ftask;
    30.   }
    31.    /**
    32.     * Callable类型
    33.     */
    34.    public <T> Future<T> submit(Callable<T> task) {
    35.        if (task == null) throw new NullPointerException();
    36.        RunnableFuture<T> ftask = newTaskFor(task);
    37.        execute(ftask);
    38.        return ftask;
    39.   }
    40.    /**
    41.     * 返回任意一个执行完成的结果
    42.     */
    43.    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
    44.                              boolean timed, long nanos)
    45.        throws InterruptedException, ExecutionException, TimeoutException {
    46.        if (tasks == null)
    47.            throw new NullPointerException();
    48.        int ntasks = tasks.size();
    49.        if (ntasks == 0)
    50.            throw new IllegalArgumentException();
    51.        //Future列表
    52.        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    53.        ExecutorCompletionService<T> ecs =
    54.            new ExecutorCompletionService<T>(this);
    55.        try {
    56.            
    57.            ExecutionException ee = null;
    58.            //截止时间---0就是没有截止时间
    59.            final long deadline = timed ? System.nanoTime() + nanos : 0L;
    60.            Iterator<? extends Callable<T>> it = tasks.iterator();
    61.          
    62.            futures.add(ecs.submit(it.next()));
    63.            --ntasks;
    64.            int active = 1;
    65.            for (;;) {
    66.                //返回已经完成的任务Future<T> 没有就返回null -- 不停的循环轮询
    67.                Future<T> f = ecs.poll();
    68.                if (f == null) {
    69.                    if (ntasks > 0) {
    70.                        --ntasks;
    71.                        futures.add(ecs.submit(it.next()));
    72.                        ++active;
    73.                   }
    74.                    else if (active == 0)
    75.                        break;
    76.                    else if (timed) {
    77.                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
    78.                        if (f == null)
    79.                            throw new TimeoutException();
    80.                        nanos = deadline - System.nanoTime();
    81.                   }
    82.                    else
    83.                        f = ecs.take();
    84.               }
    85.                if (f != null) {
    86.                    --active;
    87.                    try {
    88.                        return f.get();
    89.                   } catch (ExecutionException eex) {
    90.                        ee = eex;
    91.                   } catch (RuntimeException rex) {
    92.                        ee = new ExecutionException(rex);
    93.                   }
    94.               }
    95.           }
    96.            if (ee == null)
    97.                ee = new ExecutionException();
    98.            throw ee;
    99.       } finally {
    100.            for (int i = 0, size = futures.size(); i < size; i++)
    101.                futures.get(i).cancel(true);
    102.       }
    103.   }
    104.    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    105.        throws InterruptedException, ExecutionException {
    106.        try {
    107.            return doInvokeAny(tasks, false, 0);
    108.       } catch (TimeoutException cannotHappen) {
    109.            assert false;
    110.            return null;
    111.       }
    112.   }
    113.    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    114.                           long timeout, TimeUnit unit)
    115.        throws InterruptedException, ExecutionException, TimeoutException {
    116.        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    117.   }
    118.    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    119.        throws InterruptedException {
    120.        if (tasks == null)
    121.            throw new NullPointerException();
    122.        //创建返回值Future 的列表
    123.        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    124.        boolean done = false;
    125.        try {
    126.            //放入线程池运行
    127.            for (Callable<T> t : tasks) {
    128.                RunnableFuture<T> f = newTaskFor(t);
    129.                futures.add(f);
    130.                execute(f);
    131.           }
    132.            //等待运行完成
    133.            for (int i = 0, size = futures.size(); i < size; i++) {
    134.                Future<T> f = futures.get(i);
    135.                if (!f.isDone()) {
    136.                    try {
    137.                        f.get();
    138.                   } catch (CancellationException ignore) {
    139.                   } catch (ExecutionException ignore) {
    140.                   }
    141.               }
    142.           }
    143.            done = true;
    144.            return futures;
    145.       } finally {
    146.            if (!done)
    147.                //将没有运行完成的线程直接取消掉
    148.                for (int i = 0, size = futures.size(); i < size; i++)
    149.                    futures.get(i).cancel(true);
    150.       }
    151.   }
    152.    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    153.                                         long timeout, TimeUnit unit)
    154.        throws InterruptedException {
    155.        if (tasks == null)
    156.            throw new NullPointerException();
    157.        long nanos = unit.toNanos(timeout);
    158.        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    159.        boolean done = false;
    160.        try {
    161.            //创建任务数组
    162.            for (Callable<T> t : tasks)
    163.                futures.add(newTaskFor(t));
    164. //截止时间
    165.            final long deadline = System.nanoTime() + nanos;
    166.            final int size = futures.size();
    167.            //减去提交的时间
    168.            for (int i = 0; i < size; i++) {
    169.                execute((Runnable)futures.get(i));
    170.                nanos = deadline - System.nanoTime();
    171.                //小于0直接返回现有的
    172.                if (nanos <= 0L)
    173.                    return futures;
    174.           }
    175. //处理每个获取的时间
    176.            for (int i = 0; i < size; i++) {
    177.                Future<T> f = futures.get(i);
    178.                if (!f.isDone()) {
    179.                    if (nanos <= 0L)
    180.                        return futures;
    181.                    try {
    182.                        f.get(nanos, TimeUnit.NANOSECONDS);
    183.                   } catch (CancellationException ignore) {
    184.                   } catch (ExecutionException ignore) {
    185.                   } catch (TimeoutException toe) {
    186.                        //发现timeout直接返回
    187.                        return futures;
    188.                   }
    189.                    nanos = deadline - System.nanoTime();
    190.               }
    191.           }
    192.            done = true;
    193.            return futures;
    194.       } finally {
    195.            if (!done)
    196.                //返回后发现还有在运行的直接cacel掉
    197.                for (int i = 0, size = futures.size(); i < size; i++)
    198.                    futures.get(i).cancel(true);
    199.       }
    200.   }
    201. }

    从上面可以看出来上面方法实现主要是通过调用 executeExecutorCompletionService 这个类。来实现了 submit , doInvokeAny ,invokeAll 这些方法。

    3. 看看最常用的实现 ThreadPoolExecutor

    首先我们来看一下 ThreadPoolExecutor 类中包含的成员变量进行逐一的分析

    1. //从开始的继承图可以看出来 ThreadPoolExecutor继承了AbstractExecutorService
    2. public class ThreadPoolExecutor extends AbstractExecutorService {
    3.    /**
    4.     * 主线程池控制状态ctl是一个atomic整型封装了两个概念字段
    5.     *
    6.     *   线程数量, 定义了有效的线程数量
    7.     *   运行状态,   定义了:运行状态,关闭状态等等。
    8.     *
    9.     * 为了封装成一个整数, 我们限制线程的数量为
    10.     * (2^29)-1 (about 500 million) 而不是 (2^31)-1
    11.     * 如果将来出现这种情况,可以将变量更改为AtomicLong,并调整下面的shift/mask常量。
    12.     * 但是在需要之前,这段代码使用int会更快、更简单。
    13.     * 工作线程是允许启动和停止的,工作线程可能会有和存活的线程有短暂的数量不同
    14.     *
    15.     *   RUNNING: 接受新任务并处理排队的任务
    16.     *   SHUTDOWN: 不接受新任务但是处理排队任务
    17.     *   STOP:     不接受新任务不接受排队任务,并且中断在处理中的任务
    18.     *            
    19.     *   TIDYING(整理): 所有的任务中止, 工作线程为0,转换到状态清理的线程将运行terminate()钩子方法
    20.     *            
    21.     *   TERMINATED: terminated() 已经完成
    22.     *
    23.     * The numerical order among these values matters, to allow
    24.     * ordered comparisons. The runState monotonically increases over
    25.     * time, but need not hit each state. The transitions are:
    26.     *
    27.     * RUNNING -> SHUTDOWN
    28.     *     调用shutdown(),或者在finalize()中调用shutdown()
    29.     * (RUNNING or SHUTDOWN) -> STOP
    30.     *   调用shutdownNow()
    31.     * SHUTDOWN -> TIDYING
    32.     *   当队列和线程池都为空的时候
    33.     * STOP -> TIDYING
    34.     *   当线程池为空
    35.     * TIDYING -> TERMINATED
    36.     *   terminated() 方法执行完成
    37.     *
    38.     */
    39.    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    40.    //线程的数量的表示位--低29位表示线程数量
    41.    private static final int COUNT_BITS = Integer.SIZE - 3;
    42.    //最大的线程的容量(2^29)-1
    43.    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    44.    // runState 用int的高三位表示
    45.    //11100000000000000000000000000000
    46.    private static final int RUNNING    = -1 << COUNT_BITS;
    47.    
    48.    //00000000000000000000000000000000
    49.    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    50.    
    51.    //00100000000000000000000000000000
    52.    private static final int STOP       =  1 << COUNT_BITS;
    53.    
    54.    //01000000000000000000000000000000
    55.    private static final int TIDYING    =  2 << COUNT_BITS;
    56.    
    57.    //01100000000000000000000000000000
    58.    private static final int TERMINATED =  3 << COUNT_BITS;
    59.    // 拆解出运行状态
    60.    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    61.    
    62.    //拆解出来线程数量
    63.    private static int workerCountOf(int c) { return c & CAPACITY; }
    64.    
    65.    //把运行状态和线程数量打包成一个整数
    66.    private static int ctlOf(int rs, int wc) { return rs | wc; }
    67.    /**
    68.     * 线程的增加和减少都是通过CAS来进行的
    69.     */
    70.    private boolean compareAndIncrementWorkerCount(int expect) {
    71.        return ctl.compareAndSet(expect, expect + 1);
    72.   }
    73.    private boolean compareAndDecrementWorkerCount(int expect) {
    74.        return ctl.compareAndSet(expect, expect - 1);
    75.   }
    76.    private void decrementWorkerCount() {
    77.        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    78.   }
    79.    /**
    80.     * 阻塞队列
    81.     */
    82.    private final BlockingQueue<Runnable> workQueue;
    83.    /**
    84.     * 非公平的重入锁
    85.     */
    86.    private final ReentrantLock mainLock = new ReentrantLock();
    87.    /**
    88.     * 仅在持有主锁mainLock时访问
    89.     * .
    90.     */
    91.    private final HashSet<Worker> workers = new HashSet<Worker>();
    92.    private final Condition termination = mainLock.newCondition();
    93.    private int largestPoolSize;
    94.    private long completedTaskCount;
    95.    private volatile ThreadFactory threadFactory;
    96.    /**
    97.     * 当执行饱和或关闭时调用处理Handler
    98.     */
    99.    private volatile RejectedExecutionHandler handler;
    100.    /**
    101.     * 闲置的线程等待超时时间
    102.     */
    103.    private volatile long keepAliveTime;
    104.    /**
    105.     * 是否允许核心线程超时
    106.     */
    107.    private volatile boolean allowCoreThreadTimeOut;
    108.    /**
    109. * 核心线程池的大小
    110.     */
    111.    private volatile int corePoolSize;
    112.    /**
    113.     * 线程的极大值
    114.     *
    115.     */
    116.    private volatile int maximumPoolSize;
    117.    /**
    118.     * 默认的被拒执行的Handler
    119.     */
    120.    private static final RejectedExecutionHandler defaultHandler =
    121.        new AbortPolicy();
    122. //Worker实现了AQS和Runnable的接口
    123.    private final class Worker
    124.        extends AbstractQueuedSynchronizer
    125.        implements Runnable
    126.   {
    127.        
    128.        private static final long serialVersionUID = 6138294804551838833L;
    129.      
    130.        final Thread thread;
    131.      
    132.        Runnable firstTask;
    133.      
    134.        volatile long completedTasks;
    135.        Worker(Runnable firstTask) {
    136.            setState(-1); // inhibit interrupts until runWorker
    137.            this.firstTask = firstTask;
    138.            this.thread = getThreadFactory().newThread(this);
    139.       }
    140.        /** Delegates main run loop to outer runWorker */
    141.        public void run() {
    142.            runWorker(this);
    143.       }
    144.        protected boolean isHeldExclusively() {
    145.            return getState() != 0;
    146.       }
    147.        protected boolean tryAcquire(int unused) {
    148.            if (compareAndSetState(0, 1)) {
    149.                setExclusiveOwnerThread(Thread.currentThread());
    150.                return true;
    151.           }
    152.            return false;
    153.       }
    154.        protected boolean tryRelease(int unused) {
    155.            setExclusiveOwnerThread(null);
    156.            setState(0);
    157.            return true;
    158.       }
    159.        public void lock()       { acquire(1); }
    160.        public boolean tryLock() { return tryAcquire(1); }
    161.        public void unlock()     { release(1); }
    162.        public boolean isLocked() { return isHeldExclusively(); }
    163.        void interruptIfStarted() {
    164.            Thread t;
    165.            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    166.                try {
    167.                    t.interrupt();
    168.               } catch (SecurityException ignore) {
    169.               }
    170.           }
    171.       }
    172.   }

    4. execute 的实现

    从上面可以看出来上面方法实现主要是通过调用 executeExecutorCompletionService 这个类。来实现了 submit , doInvokeAny ,invokeAll 这些方法。下面就来看一下 execute 这方法在 ThreadPoolExecutor 中是如何实现的。

    1. public void execute(Runnable command) {
    2.        if (command == null)
    3.            throw new NullPointerException();
    4.        /*
    5.         * Proceed in 3 steps:
    6.         *
    7.         * 1 如果运行的线程小于corePoolSize,则尝试以给定命令作为其第一个任务启动新线程。对
    8.         * addWorker的调用以原子方式检查runState和workerCount,从而通过返回false防止在不
    9.         * 应该添加线程时添加错误警报。
    10.         *
    11.         * 2. 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加线程(因为上次检
    12.         * 查后已有线程死亡),或者是否应该在进入这个方法后关闭线程池。因此,我们重新检查状
    13.         * 态,如果有必要的话,在停止时回滚队列,或者在没有线程时启动新线程。
    14.         *
    15.         * 3. 如果无法对任务排队,则尝试添加新线程。如果它失败了,我们知道我们被关闭或饱和,所以拒绝任务。
    16.         * 所以拒绝任务。
    17.         *
    18.         */
    19.    
    20.     //获取线程池中线程数量---默认是0
    21.        int c = ctl.get();
    22.        //如果设置了核心线程数先判断核心线程数是不是已经满了
    23.        if (workerCountOf(c) < corePoolSize) {
    24.            if (addWorker(command, true))
    25.                return;
    26.            c = ctl.get();
    27.       }
    28.        //判断线程池是否处于运行状态并且还能往队列添加任务
    29.        if (isRunning(c) && workQueue.offer(command)) {
    30.            int recheck = ctl.get();
    31.            //双重检查---如果不是运行状态从队列中删除任务
    32.            if (! isRunning(recheck) && remove(command))
    33.                //根据传入的不同策略处理器处理问题
    34.                reject(command);
    35.            else if (workerCountOf(recheck) == 0)
    36.                addWorker(null, false);
    37.       }
    38.        //添加非核心线程任务
    39.        else if (!addWorker(command, false))
    40.            //添加失败根据传入的不同的策略处理器处理问题
    41.            reject(command);
    42.   }

    execute 方法主要做了三件事情:

    1. 添加核心处理线程
    2. 线程池在运行状态,添加任务到任务阻塞队列中
    3. 新增非核心线程处理

    在线程池构造函数中有设置keepAliveTime,这个设置的就是非coreThread的存活时间。

    通过上面的源码发现主要是 addWorker 方法:

    1.  private boolean addWorker(Runnable firstTask, boolean core) {
    2.        //增加Worker的数量
    3.        retry:
    4.        for (;;) {
    5.            int c = ctl.get();
    6.            int rs = runStateOf(c);
    7.            // Check if queue empty only if necessary.
    8.            if (rs >= SHUTDOWN &&
    9.                ! (rs == SHUTDOWN &&
    10.                   firstTask == null &&
    11.                   ! workQueue.isEmpty()))
    12.                return false;
    13.            for (;;) {
    14.                int wc = workerCountOf(c);
    15.                if (wc >= CAPACITY ||
    16.                    wc >= (core ? corePoolSize : maximumPoolSize))
    17.                    return false;
    18.                if (compareAndIncrementWorkerCount(c))
    19.                    break retry;
    20.                c = ctl.get();  // Re-read ctl
    21.                if (runStateOf(c) != rs)
    22.                    continue retry;
    23.           }
    24.       }
    25.        //创建Worker并且启动
    26.        boolean workerStarted = false;
    27.        boolean workerAdded = false;
    28.        Worker w = null;
    29.        try {
    30.            w = new Worker(firstTask);
    31.            final Thread t = w.thread;
    32.            if (t != null) {
    33.                final ReentrantLock mainLock = this.mainLock;
    34.                mainLock.lock();
    35.                try {
    36.                    // Recheck while holding lock.
    37.                    // Back out on ThreadFactory failure or if
    38.                    // shut down before lock acquired.
    39.                    int rs = runStateOf(ctl.get());
    40.                    if (rs < SHUTDOWN ||
    41.                       (rs == SHUTDOWN && firstTask == null)) {
    42.                        if (t.isAlive()) // precheck that t is startable
    43.                            throw new IllegalThreadStateException();
    44.                        workers.add(w);
    45.                        int s = workers.size();
    46.                        if (s > largestPoolSize)
    47.                            largestPoolSize = s;
    48.                        workerAdded = true;
    49.                   }
    50.               } finally {
    51.                    mainLock.unlock();
    52.               }
    53.                if (workerAdded) {
    54.                    t.start();
    55.                    workerStarted = true;
    56.               }
    57.           }
    58.       } finally {
    59.            if (! workerStarted)
    60.                addWorkerFailed(w);
    61.       }
    62.        return workerStarted;
    63.   }

    上面的代码也是做了两件事情:

    1. 增加worker数量的统计
    2. 创建新的Worker并且启动

    在线程池中的任务处理主要是靠一个 Worker 的内部类进行处理。下面来看一下这个内部类:

    1. private final class Worker
    2.        extends AbstractQueuedSynchronizer
    3.        implements Runnable
    4.   {
    5.        /**
    6.         * This class will never be serialized, but we provide a
    7.         * serialVersionUID to suppress a javac warning.
    8.         */
    9.        private static final long serialVersionUID = 6138294804551838833L;
    10.        /** Thread this worker is running in. Null if factory fails. */
    11.        final Thread thread;
    12.        /** Initial task to run. Possibly null. */
    13.        Runnable firstTask;
    14.        /** Per-thread task counter */
    15.        volatile long completedTasks;
    16.        /**
    17.         * Creates with given first task and thread from ThreadFactory.
    18.         * @param firstTask the first task (null if none)
    19.         */
    20.        Worker(Runnable firstTask) {
    21.            setState(-1); // inhibit interrupts until runWorker
    22.            this.firstTask = firstTask;
    23.            this.thread = getThreadFactory().newThread(this);
    24.       }
    25.        /** Delegates main run loop to outer runWorker */
    26.        public void run() {
    27.            runWorker(this);
    28.       }
    29.        protected boolean isHeldExclusively() {
    30.            return getState() != 0;
    31.       }
    32.        protected boolean tryAcquire(int unused) {
    33.            if (compareAndSetState(0, 1)) {
    34.                setExclusiveOwnerThread(Thread.currentThread());
    35.                return true;
    36.           }
    37.            return false;
    38.       }
    39.        protected boolean tryRelease(int unused) {
    40.            setExclusiveOwnerThread(null);
    41.            setState(0);
    42.            return true;
    43.       }
    44.        public void lock()       { acquire(1); }
    45.        public boolean tryLock() { return tryAcquire(1); }
    46.        public void unlock()     { release(1); }
    47.        public boolean isLocked() { return isHeldExclusively(); }
    48.        void interruptIfStarted() {
    49.            Thread t;
    50.            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    51.                try {
    52.                    t.interrupt();
    53.               } catch (SecurityException ignore) {
    54.               }
    55.           }
    56.       }
    57.   }

    Worker 继承了 AbstractQueuedSynchronizer 实现了 RunnableWorker 中有两个变量:

    • Thread变量
    • Runnable变量

    第一个是在创建Worker的时候,把Worker变成线程保存起来,也就是通过这样的方式来处理任务,Runnable保存的是创建Worker的时候执行的任务。那么这个Worker的run方法什么时候执行。在前面执行 addWorker 方法的时候,会有一个创建Worker的过程,然后调用了Thread.start()方法。这样就会执行到Worker的run方法,而在run方法中调用的是 ThreadPoolExecutor.runWorker 参数是当前Worker的实例:

    1. final void runWorker(Worker w) {
    2.        Thread wt = Thread.currentThread();
    3.        Runnable task = w.firstTask;
    4.        w.firstTask = null;
    5.        w.unlock(); // allow interrupts
    6.        boolean completedAbruptly = true;
    7.        try {
    8.            while (task != null || (task = getTask()) != null) {
    9.                w.lock();
    10.                if ((runStateAtLeast(ctl.get(), STOP) ||
    11.                     (Thread.interrupted() &&
    12.                      runStateAtLeast(ctl.get(), STOP))) &&
    13.                    !wt.isInterrupted())
    14.                    wt.interrupt();
    15.                try {
    16.                    beforeExecute(wt, task);
    17.                    Throwable thrown = null;
    18.                    try {
    19.                        task.run();
    20.                   } catch (RuntimeException x) {
    21.                        thrown = x; throw x;
    22.                   } catch (Error x) {
    23.                        thrown = x; throw x;
    24.                   } catch (Throwable x) {
    25.                        thrown = x; throw new Error(x);
    26.                   } finally {
    27.                        afterExecute(task, thrown);
    28.                   }
    29.               } finally {
    30.                    task = null;
    31.                    w.completedTasks++;
    32.                    w.unlock();
    33.               }
    34.           }
    35.            completedAbruptly = false;
    36.       } finally {
    37.            processWorkerExit(w, completedAbruptly);
    38.       }
    39.   }

    首先获取Worker中的需要处理的任务去处理,当处理完成Worker中的通过获取getTask任务列表中的任务进行处理。根据是否有核心处理线程(Worker)来是否要退出当前Worker:

    1. private Runnable getTask() {
    2.        boolean timedOut = false; // Did the last poll() time out?
    3.        for (;;) {
    4.            int c = ctl.get();
    5.            int rs = runStateOf(c);
    6.            // Check if queue empty only if necessary.
    7.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    8.                decrementWorkerCount();
    9.                return null;
    10.           }
    11.            int wc = workerCountOf(c);
    12.            // 默认情况下core线程不会失效
    13.            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    14.            if ((wc > maximumPoolSize || (timed && timedOut))
    15.                && (wc > 1 || workQueue.isEmpty())) {
    16.                if (compareAndDecrementWorkerCount(c))
    17.                    return null;
    18.                continue;
    19.           }
    20.            try {
    21.                //根据是否失效调用任务列表的不同方法
    22.                Runnable r = timed ?
    23.                    //调用poll,在规定时间内还没有就返回null
    24.                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    25.               //没有就阻塞当前线程
    26.                    workQueue.take();
    27.                if (r != null)
    28.                    return r;
    29.                timedOut = true;
    30.           } catch (InterruptedException retry) {
    31.                timedOut = false;
    32.           }
    33.       }
    34.   }

    5. 线程池化的模型图

    • 从池的空闲线程列表中选择一个 Thread,并且指派它去运行一个已提交的任务(一个 Runnable,Callable 的实现)

    • 当任务完成时,将该 Thread 返回给该列表,使其可被重用。

    6. 线程池拒绝策略

    • CallerRunsPolicy:在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务

    1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
    2.      
    3.        public CallerRunsPolicy() { }
    4.      
    5.        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    6.            if (!e.isShutdown()) {
    7.                r.run();
    8.           }
    9.       }
    10.   }

    AbortPolicy:直接抛出异常

    1. public static class AbortPolicy implements RejectedExecutionHandler {
    2.        
    3.        public AbortPolicy() { }
    4.        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    5.            throw new RejectedExecutionException("Task " + r.toString() +
    6.                                                 " rejected from " +
    7.                                                 e.toString());
    8.       }
    9.   }

    DiscardPolicy:会让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。

    1. public static class DiscardPolicy implements RejectedExecutionHandler {
    2.        public DiscardPolicy() { }
    3.        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4.       }
    5.   }

    DiscardOldestPolicy:DiscardOldestPolicy策略的作用是,当任务呗拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。

    1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    2.        public DiscardOldestPolicy() { }
    3.        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4.            if (!e.isShutdown()) {
    5.                e.getQueue().poll();
    6.                e.execute(r);
    7.           }
    8.       }
    9.   }

    自定义策略,只要实现RejectedExecutionHandler接口

     

  • 相关阅读:
    LeetCode(力扣)78. 子集Python
    研发过程中的文档管理与工具
    String解析及其方法
    JS选择框全选效果(从入门到优化)
    参考文献批量添加上标
    Logback 日志格式参数说明
    CISP考试有哪些备考技巧
    使用git发布(删除/更改)GitHub仓库中的内容
    前缀和和差分和dijkstra算法和二分算法和floyd算法
    Google Earth Engine(GEE)——利用行列号进行影像去云,使用研究区大于单景影像的研究区范围
  • 原文地址:https://blog.csdn.net/m0_67698950/article/details/125426244