• Java之juc旅途-Executor(四)


    概述

    Executor作为一个底层接口,定义了线程的执行方法:

    public interface Executor {
        void execute(Runnable command);
    }
    
    • 1
    • 2
    • 3

    其中该包中的ExecutorService继承了Executor是一个运用更广泛的接口,它直接定义了线程池的操作方法:

    public interface ExecutorService extends Executor {
    	// 关闭线程池,会等待正在执行中的线程执行完
        void shutdown();
    	
    	// 立刻关闭线程池,如果有正在执行的线程,会被中断
        List<Runnable> shutdownNow();
    	
    	// 判断线程池是否被关闭了
        boolean isShutdown();
        
        // 判断线程池是否终止了,这是线程池的最终状态
        boolean isTerminated();
        
        // 等待给定时间后终止线程池
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        
        // 提交一个执行任务,返回一个未来的引用,这个引用可以获取到执行结果
        <T> Future<T> submit(Callable<T> task);
        
        // 提交一个执行任务,返回一个未来的引用,这个引用可获取到结果是传入的参数result
        <T> Future<T> submit(Runnable task, T result);
        
        // 提交一个执行任务,返回一个未来的引用,这个引用可以获取到执行结果,入参跟Callable相比就是入参的范围比较广了,只要实现Runable接口即可
        Future<?> submit(Runnable task);
    
    	// 执行所有传入的任务,等待所有任务结束才返回,结果都在返回的list里面,
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    	
    	// 跟上面方面相比,会有一个时间限制完成所有的任务,如果时间耗尽,有些任务没执行完,则没有结果
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
    	// 返回任意一个执行的结果
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    	
    	// 给定时间内返回任意一个执行结果
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    而AbstractExecutorService是实现ExecutorService接口的抽象类,定制了线程池的一些通用操作。
    ThreadPoolExecutor继承AbstractExecutorService,对线程池的细节进行实现。

    ForkJoinPool继承AbstractExecutorService,针对计算密集型的需求,把大的任务拆分成多个小任务(即fork),然后再将多个小任务处理汇总到一个结果上(即join)的设计实现。

    ScheduledExecutorService是继承ExecutorService接口的接口,定制了线程循环或者延迟的调度操作接口,而对应的ScheduledThreadPoolExecutor继承了ThreadPoolExecutor跟ScheduledExecutorService,定制线程池管理的线程可以进行循环或者延迟的操作。

    同包下有个Executors,给使用者提供上面三个实例类的一些常用的创建实例方法。

    ThreadPoolExecutor

    如果没有做线程的复用并且并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。
    所以为了复用线程就做了一个线程池,线程池为线程的开销和资源不足问题提供了解决方案,通过对多个任务复用线程,线程创建的开销被分摊到了多个任务上。
    java中ThreadPoolExecutor就是基础线程池的实现。

    核心参数

    ThreadPoolExecutor有七个核心的参数:

    1. corePoolSize:核心线程数量,这部分线程不会被销毁。
    2. workQueue:保存等待执行的任务的阻塞队列,线程池的核心线程都有任务了,这时候新进来的任务都在队列里等着。
    3. maximumPoolSize:最大线程数量,就相当于线程池的计划工作线程数量。当核心线程都有任务了,并且等待队列也满了,这时候就会按照这个最大数量去招一些外包线程(创建新的外包线程)来处理新来的任务。
    4. keepAliveTime: 外包线程的存活时间。当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则额外的线程将被销毁。(注意的是其实线程池根本不会去标识线程核心还是外包,它只保证两者的数量,所以在销毁时只会销毁那些没任务的)
    5. unit:这个用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS。
    6. threadFactory:用来指定创建新线程的工厂,默认使用Executors.defaultThreadFactory()来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
    7. handler:线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:AbortPolicy:直接抛出异常,这是默认策略;CallerRunsPolicy:用调用者所在的线程来执行任务;DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy:直接丢弃任务。

    状态

    线程池有5种状态:

    1. RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;
    2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
    3. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
    4. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
    5. TERMINATED:在terminated() 方法执行完后进入该状态。

    在这里插入图片描述

    核心代码

    execute()

    	    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();   
    	    // clt记录着runState和workerCount,即线程池状态和有效线程数
            int c = ctl.get();
            
            // workerCountOf方法取出低29位的值,表示当前活动的线程数
            // 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中(调用addWorker创建线程执行任务)
            if (workerCountOf(c) < corePoolSize) {
            	// addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
                // 如果为true,根据corePoolSize来判断;
                // 如果为false,则根据maximumPoolSize来判断
                if (addWorker(command, true))
                    return;
                // 如果添加失败,则重新获取ctl值
                c = ctl.get();
            }
            // 如果不小于corePoolSize,则将任务添加到workQueue队列
            // 如果当前线程池是运行状态并且任务添加到队列成功
            if (isRunning(c) && workQueue.offer(command)) {
            	// 重新获取ctl值
                int recheck = ctl.get();
                // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
                // 需要移除该command,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                /*
                 * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
                 * 这里传入的参数表示:
                 * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
                 * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
                 * 如果判断workerCount不等于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
                 */
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    
            /*
    	     * 如果执行到这里,有两种情况:
    	     * 1. 线程池已经不是RUNNING状态;
    	     * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
    	     * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
    	     * 如果失败则拒绝该任务
    	     */
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

    1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
    2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中
    3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
    4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

    addWorker()

    addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                // 获取运行状态
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                /*
    	         * 这个if判断
    	         * 如果rs >= SHUTDOWN,则表示线程池状态为STOP或者TIDYING或者TERMINATED,此时不再接收新任务;
    	         * 接着判断以下3个条件,只要有1个不满足,则返回false:
    	         * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
    	         * 2. firsTask为空
    	         * 3. 阻塞队列不为空
    	         *
    	         * 首先考虑rs == SHUTDOWN的情况
    	         * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
    	         * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
    	         * 因为队列中已经没有任务了,不需要再添加线程了
    	         */
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                	// 获取线程数
                    int wc = workerCountOf(c);
                    // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
                    // 在创建非核心线程时,即core等于false时,判断当前线程数是否大于等于maximumPoolSize,
                    // 在core等于true时,判断当前线程数是否大于等于corePoolSize,
                    // 如果大于等于则返回false。
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 尝试CAS增加workerCount,如果成功,则跳出所有for循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    // 如果增加workerCount失败,则重新获取ctl的值
                    c = ctl.get();  // Re-read ctl
                    // 如果当前的运行状态不等于rs,说明状态已被改变,返回到第一个for循环外继续执行
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
            	// 根据firstTask来创建Worker对象
                w = new Worker(firstTask);
                // 每一个Worker对象都会创建一个线程
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
    					// rs < SHUTDOWN表示是RUNNING状态;
                   	 	// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                    	// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // workers是一个HashSet();
                            workers.add(w);
                            int s = workers.size();
                            // largestPoolSize记录着线程池中出现过的最大线程数量
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                    	// 启动这个线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    Worker

    线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:

    	private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            // thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            // firstTask用它来保存传入的任务
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
            	// 禁止在执行任务前对线程进行中断
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                // 可以看到在创建Worker时会调用threadFactory来创建一个线程
                // newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
            	// addWorker中的t.start()就会触发该run方法
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。Worker继承了AQS,使用AQS来实现独占锁的功能, 用于判断线程是否空闲以及是否可以被中断。

    监控

    通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:

    • getTaskCount:线程池已经执行的和未执行的任务总数;
    • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
    • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
    • getPoolSize:线程池当前的线程数量;
    • getActiveCount:当前线程池中正在执行任务的线程数量。

    通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

    ForkJoinPool

    ForkJoinPool与ThreadPoolExecutor结构相似,它可以通过规则将一个任务分割为可附加的多个子任务,并行执行这些子任务,最终对子任务的运算结果进行快速排序算法等聚合运算,得到最终结果。

    使用ForkJoinPool的第一个原则是确保任务拆分的合理性,它除了拆分任务之外还有另一个更强大的特性,就是它实现了工作窃取。它的线程池中的每一个线程,都有着自己的专属任务队列,线程会优先处理自己队列中的任务,如果队列是空的,就会去其他线程的队列中寻找任务。因此,即便400万个任务当中,某个任务执行时间很长,ForkJoinPool中的其他线程也可以完成其余的任务。而ThreadPoolExecutor就做不到这点了,如果这种情况发生在它身上,其他线程无法接手额外的任务。

    在这里插入图片描述

    ScheduledThreadPoolExecutor

    它主要用来在给定的延迟之后运行任务,或者定期执行任务。从构造方法可以得知,它做了一个拥有无界队列DelayedWorkQueue的线程池:

     public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在ScheduledThreadPoolExecutor的任务用ScheduledFutureTask来定义,他包含三个核心参数:

    • time:表示这个任务将要被执行的具体时间。
    • sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
    • period:表示任务执行的间隔周期。

    DelayedWorkQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumner小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

    添加任务

    1. 获取Lock。
    2. 添加任务。
      • 向PriorityQueue添加任务。
      • 如果添加的任务是PriorityQueue的头元素,唤醒在Condition中等待的所有线程。
    3. 释放Lock。

    获取任务

    1. 获取Lock。
    2. 获取周期任务。
      1. 如果PriorityQueue为空,当前线程到Condition中等待;否则执行下一步。
      2. 如果PriorityQueue的头元素的time时间比当前时间大,到Condition中等待time时间;否则执行下一步。
      3. 获取PriorityQueue头元素;如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程。
    3. 释放Lock。

    Executors

    newCachedThreadPool

    创建一个不限制线程数量但是会有过期时间的线程池,其队列使用的是SynchronousQueue,一种添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为1的阻塞队列, 所以提交的任务基本都会新建(如果没有空闲)一个线程去处理。因为使用的是无限大的线程数量,所以当任务过多时会有内存溢出的危险。
    这个线程池通常会提高执行许多短期异步任务的程序的性能。

     public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    newFixedThreadPool

    创建一个线程不会被销毁的线程池,因为使用的是无限大的阻塞队列,所以当任务过多时会有内存溢出的危险。

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    newSingleThreadExecutor

    创建只有一个线程的线程池,省去切换线程的损耗,因为使用的是无限大的阻塞队列,所以当任务过多时会有内存溢出的危险。

     public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    newWorkStealingPool

    创建一个可以窃取的ForkJoinPool(当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行)。

    public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }
    
    public static ExecutorService newWorkStealingPool(int parallelism) {
            return new ForkJoinPool
                (parallelism,
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    newScheduledThreadPool

    创建一个可调度的线程池,因为底层的线程池的最大线程数量不限,所以当任务过多时会有内存溢出的危险。

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
     public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    newSingleThreadScheduledExecutor

    创建一个线程只有1的可调度线程池,如果任务多于一个,任务将按先后顺序执行。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
     public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    JavaScript基础 JavaScript第三天 1. for - 循环 && 2. 数组
    OpenAI 的视频生成大模型Sora的核心技术详解(一):Diffusion模型原理和代码详解
    GaN HEMTs在电力电子应用中的交叉耦合与基板电容分析与建模
    Jenkins pipeline流程控制选项
    [强网杯 2019]随便注
    podman创建helloWord镜像实例-参考docker
    系统的去学习一门编程语言,原来有如此捷径
    retrofit-spring-boot-starter这款轻量级 HTTP 神器好用到爆
    动态时间规整算法——DTW
    【Vue】provider/inject 祖孙传值
  • 原文地址:https://blog.csdn.net/h295928126/article/details/126004935