• Java-多线程-ThreadPoolExecutor


    前言

    前面我们讲解线程的时候,讲到了使用Executors创建线程池,但是它里面所有方法可变的参数太少,不能很好的进行自定义设置,以及以后的扩展,更合理的使用cpu线程的操作,所以使用ThreadPoolExecutor创建线程池是最好的使用方式

    1、快速使用

    public static void main(String[] args) throws Exception{
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,
            3,
            500,
            TimeUnit.MINUTES,
            new LinkedBlockingQueue<>(),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("我的线程池-");
                    return thread;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()
        );
    
        // execute用于执行没有返回值的实现了Runnable的方法
        // 还有一个submit方法可以执行Runnable和Callable的有返回值的方法
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    System.out.println(Thread.currentThread().getName() + ":" + i);
                }
            }
        });
    
        // 记得一定要关闭线程池,最好放在finally里面,线程执行用try包裹住
        executor.shutdown();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2、参数讲解

    在这里插入图片描述

    使用ThreadPoolExecutor创建线程池,只能使用这一个有参构造创建,有7个参数,下面进行讲解

    • int corePoolSize:核心线程池的大小
    • int maximumPoolSize:最大线程池的大小
    • long keepAliveTime:非核心线程池的空闲时间
    • TimeUnit unit:存活时间的单位
    • BlockingQueue orkQueue:阻塞/工作队列,常用的有
      • ArrayBlockingQueue
      • LinkedBlockingQueue
      • SynchronousQueue
      • 在这里插入图片描述
    • ThreadFactory threadFactory:线程池里面线程的线程创建工厂
    • RejectedExecutionHandler handler:拒绝策略,有四种
      • AbortPolicy:如果超出最大线程数+队列排队限制数,则抛出异常
      • CallerRunsPolicy:如过超出,则由创建线程池的线程来执行方法
      • DiscardOldestPolicy:如果超出,则将队列中最前面等待的线程任务弹出,当前线程从排队序列后面加入进去
      • DiscardPolicy:不做任何处理,超出则不执行线程,不做任何操作

    在这里插入图片描述

    3、源码解析

    3.1、线程池的五种状态

    RUNNING(111):正常运行中

    SHUTDOWN(000):关闭线程池,不接受新任务。线程池内队列以及正在执行的任务会正常执行完毕

    STOP(001):关闭线程池,不接受新任务。线程池内所有线程也强制关闭

    TIDYING(010):过度状态

    TERMINATED(011):线程池凉凉

    // 核心线程池的数量,int类型,使用了原子类进行加减
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 32-3 = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 1<<29:00100000 00000000 00000000 00000000
    // -1:	 00011111 11111111 11111111 11111111	
    // 000代表线程池的状态,后面代表线程池的容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // -1<<29:111.....后面不管	RUNNING:111
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 0<<29:000...	SHUTDOWN:000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 1<<29:001...	STOP:001
    private static final int STOP       =  1 << COUNT_BITS;
    // 2<<29:010... TIDYING:010
    private static final int TIDYING    =  2 << COUNT_BITS;
    //3<<29:011... TERMINATED:011
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.2、线程池的状态切换流程

    在这里插入图片描述

    3.3、execute方法

    在将来某个时候执行给定的任务。任务可以在新线程或现有的合并的线程中执行。如果任务无法提交执行,由于此执行程序已关闭或已达到其容量,该任务将由当前的RejectedExecutionHandler处理。

    public void execute(Runnable command) {
        // 健壮性判断
        if (command == null)
            throw new NullPointerException();
    
        // 获取crl的数量
        int c = ctl.get();
    
        // 工作线程的个数是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 通过add方法,添加一个核心线程去执行command任务
            if (addWorker(command, true))
                // 添加核心线程成功,返回true,直接return结束
                return;
            // 如果在并发情况下,添加核心线程失败的线程,需要重新获取一次ctl属性
            c = ctl.get();
        }
        // 创建核心线程失败的情况
        // 判断当前线程池状态是否为RUNNING
        // 如果是RUNNING,执行offer方法将任务添加到工作队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 添加任务到工作队列成功
            int recheck = ctl.get();
            // 判断线程池是否是RUNNING状态,如果不是RUNNING状态,需要将任务从工作队列移除
            if (! isRunning(recheck) && remove(command))
                // 执行拒绝策略(线程池状态不正确)
                reject(command);
            // 判断工作线程是否为0
            else if (workerCountOf(recheck) == 0)
                // 工作线程数为0,但是工作队列中有任务在排队
                // 添加一个非核心空任务的线程,为了处理在工作队列排队的任务
                addWorker(null, false);
        }
        // 添加任务到工作队列失败,添加非核心线程去执行当前任务
        else if (!addWorker(command, false))
            // 添加非核心线程失败,执行reject拒绝策略
            reject(command);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.4、addWorker方法

    boolean addWorker(Runnable firstTask, boolean core):第一个参数为线程任务,第二个参数为创建核心/非核心线程的标识

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 对线程池的判断,以及对工作线程数量的判断
        // 设置标签,跳出外层循环
        retry:
        for (;;) {
            // 获取ctl的值
            int c = ctl.get();
            // 拿到线程池的状态
            int rs = runStateOf(c);
    
            // 如果线程池状态不是RUNNING,就再次做后续判断,查看当前任务是否可以不处理
            if (rs >= SHUTDOWN &&
                // 线程池状态为SHUTDOWN,并且任务为空,并且工作队列不为空
                // 如果同时满足了这三个要求,那就是要处理工作队列当前中的任务
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                // 只要不是RUNNING状态,不处理新任务
                // 如果SHUTDOWN状态,并且满足了之前addWorker(null,false),并且工作队列有任务时,不能走当前位置
                return false;
    
            for (;;) {
                // 基于ctl获取当前工作线程数量
                int wc = workerCountOf(c);
                // 判断工作线程数是否大于最大值
                if (wc >= CAPACITY ||
                    // 如果是核心线程,是否大于设置的corePoolSize,如果是非核心线程,是否大于maximunPoolSize
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 当前工作线程已经达到最大值了
                    return false;
                // 以CAS的方式,对工作线程数+1,如果成功
                if (compareAndIncrementWorkerCount(c))
                    // 直接跳出外层for循环
                    break retry;
                // 重新获取ctl的值
                c = ctl.get();
                // 基于新获取的ctl拿到线程池状态,判断和之前的rs状态是否一直
                if (runStateOf(c) != rs)
                    // 说明并发操作导致线程池状态变化,需要重新判断状态
                    continue retry;
            }
        }
    
        // 添加工作线程,并启动工作线程
        // 工作线程是否启动了
        boolean workerStarted = false;
        // 工作线程是否添加了
        boolean workerAdded = false;
        // Worker就是工作线程
        Worker w = null;
        try {
            // new Worker勾线工作线程,将任务扔到了Worker中
            w = new Worker(firstTask);
            // 拿到了Worker中绑定的Thread线程
            final Thread t = w.thread;
            // 肯定部位null,健壮性判断
            if (t != null) {
                // 加锁。。。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 基于重新获取的ctl,拿到线程池的状态
                    int rs = runStateOf(ctl.get());
    				// 如果满足线程池状态为RUNNING,就添加工作任务
                    if (rs < SHUTDOWN ||
                        // 如果线程池状态为SHUTDOWN,并且传入的任务为null
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 开始添加工作线程
                        // 判断当前线程是否处于run状态(健壮性判断)
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 将构件好的Worker对象添加到了workers
                        workers.add(w);
                        // 获取工作线程个数
                        int s = workers.size();
                        // 如果现在的工作线程数,大于历史最大的工作线程数,就重新赋值给largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    3.5、Worker对象

    private final class Worker
        extends AbstractQueuedSynchronizer // 线程中断
        implements Runnable // 存储需要执行的任务
    {
    
        // 工作线程的Thread对象
        final Thread thread;
        // 需要执行的任务
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        
        Worker(Runnable firstTask) {
            // 刚刚初始化的工作线程不允许被中断
            setState(-1); 
            // 第一次new的时候,会将任务赋值给firstTask
            this.firstTask = firstTask;
            // Worker构建Thread对象
            this.thread = getThreadFactory().newThread(this);
        }
    
        // 调用t.start(),执行当前的run方法
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    3.6、runWorker方法

    用于启动Worker对象中的线程

    final void runWorker(Worker w) {
        // 拿到当前工作线程
        Thread wt = Thread.currentThread();
        // 拿到Worker对象中封装的任务
        Runnable task = w.firstTask;
        // 将Worker的firstTask归为null
        w.firstTask = null;
        // 将Worker的state归为0,代表可以被中断
        w.unlock();
        // 执行任务时,钩子函数中是否出现异常的标识
        boolean completedAbruptly = true;
        try {
            // 获取任务的第一个方式,就是执行execute、submit时,传入的任务直接处理
            // 获取任务的第二个方式,就是从工作队列中获取任务执行
            while (task != null || (task = getTask()) != null) {
                // 加锁,在SHUTDOWN状态下,当前线程不允许被中断
                // 并且Worker内部实现的锁,并不是可重入锁,因为在中断时,也需要对worker进行lock,不能获取就代表当前工作线程正在执行任务
                w.lock();
                // 如果线程池状态变为了STOP状态,必须将当前线程中断
                // 第一个判断:判断当前线程池状态是否是STOP
                // 第二个判断:查看中断标记为,并归位,如果为false,说明不是STOP,如果变为true,需要再次查看是否并发操作导致线程池为STOP
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                    // 查询当前线程中断标记是否为false,如果为false,就执行wt.interrupt()
                    !wt.isInterrupted())
                    // 将中断标记设置为true
                    wt.interrupt();
                try {
                    // 执行任务的钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务的钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将task置为null
                    task = null;
                    // 执行成功的任务个数+1
                    w.completedTasks++;
                    // 将state标记位设置为0
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    3.7、getTask方法

    从工作队列中获取任务

    private Runnable getTask() {
        // 表示(非核心线程可以干掉)
        boolean timedOut = false; 
    
        for (;;) {
            // 获取ctl表示
            int c = ctl.get();
            // 拿到了线程池的状态
            int rs = runStateOf(c);
    
            // 如果进入if,需要干掉当前工作线程
            // 线程池状态为SHUTDOWN、STOP
            // 如果线程池状态大于等于STOP、需要移除掉当前工作线程
            // 如果线程池状态为SHUTDOWN、并且工作队列为空,需要移除掉当前工作线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 移除当前工作线程
                decrementWorkerCount();
                return null;
            }
    
            // 重新获取工作线程的个数
            int wc = workerCountOf(c);
    
            // allowCoreThreadTimeOut:是否允许核心线程超时(一般为false)
            // 工作线程是否大于核心线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if (
                // 工作线程是否已经大于最大线程数了
                // 工作线程数大于核心线程数,并且当前线程已经超时
                // 尝试干掉当前线程
                (wc > maximumPoolSize || (timed && timedOut))
                // 工作线程数大于1、或者工作队列为空
                // 如果工作队列为空,我就干掉我自己
                // 如果工作线程数大于1,我就干掉我自己
                && (wc > 1 || workQueue.isEmpty())) {
                // 基于CAS的方式移除掉当前线程,只有一个线程会CAS成功
                if (compareAndDecrementWorkerCount(c))
                    // 返回null,交给processWorkerExit移除当前工作线程
                    return null;
                continue;
            }
            
            // ==================从工作队列获取任务=============================
            try {
                Runnable r = timed ?
                    // 阻塞一定时间从工作队列拿任务(可以理解为非核心走这个方法)
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 一直阻塞!(可以理解为核心线程走这个)
                    workQueue.take();
                if (r != null)
                    // 如果拿到任务直接返回执行。。
                    return r;
                // 从队列获取任务时,超时了(达到了当前工作线程的最大生存时间)
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    3.8、processWorkerExit方法

    移除当前工作线程

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果执行processWorkerExit方法的操作不是getTask中的操作,是直接因为异常引起的。(一般是钩子函数中抛出异常)
        if (completedAbruptly)
            decrementWorkerCount();
    
        // 加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 记录当前线程池一共处理了多少个任务
            completedTaskCount += w.completedTasks;
            // 移除工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 尝试将线程池关系(到过度状态 - 销毁状态)
        tryTerminate();
    
        // 重新获取ctl
        int c = ctl.get();
        // 当前线程池状态,进到这,说明是RUNNING、SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 如果是正常状态移除当前工作线程
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 说明是不正常的方式移除了当前工作线程,再添加一个工作线程
            addWorker(null, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    Dubbo面试题(总结最全面的面试题)
    FullCalendar日历插件说明文档
    WPF实时时间显示demo(MVVM)
    对1GHz脉冲多普勒雷达进行快速和慢速处理生成5个移动目标的距离多普勒图研究(Matlab代码实现)
    ospf 单区域配置
    如何使用SpringCloud Eureka 创建单机Eureka Server-注册中心
    01-安装
    【C++】unordered_map与unorder_set的封装(哈希桶)
    7 张图解 CrashLoopBackOff,如何发现问题并解决它?
    在CentOs7中设置tomcat应用systemd启动服务
  • 原文地址:https://blog.csdn.net/qq_57404736/article/details/127952729