• 并发基础(四):线程池


    尺有所短,寸有所长;不忘初心,方得始终。

    请关注公众号:星河之码

    线程池技术是一种多线程处理形式,将任务添加到队列中,通过线程池中创建出来的现成执行这些任务,省去了创建线程和销毁线程的消耗,我认为有点类似JAVA中Spring托管对象的思想在里面

    一、为啥要用线程池

    使用线程池可以根据系统的需求和硬件环境灵活的控制线程的数量,对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力。总的来说有三点:

    • 降低资源消耗

      通过系统的需求和硬件环境创建线程数,重复利用已创建的线程,降低线程创建和销毁造成的消耗。

    • 提高响应速度

      当任务到提交时,不需要创建线程,立即可以执行,执行完不需要销毁线程,线程池回收,最大限度的提高效率。

    • 提高线程的可管理性

      线程由线程池进行统一分配、调优和监控,提高系统的稳定性。

    二、线程池应该怎么设计

    理解了上面为啥要用线程池,有了目的也就有了方向,有了方向就知道设计一个线程池思路了

    • 先要有一个队列,存储要执行的任务

      线程数有限,多余任务放在队列中等待执行

    • 线程池中放多少个线程,同一时间线程池中可以执行多少个线程,得有一个大小限制

      线程池也不能无限大小

    • 啥时候创建线程放在线程池中呢

    • 所有的线程都是一直存活不销毁的吗

    • 队列满了以后,后面的任务怎么处理

    • 线程池本身的生命周期时什么

    当我们理解了这些问题之后,设计一个线程就会有方向跟思路了,同样的,我们也能围绕这些问题更好的去理解线程池的原理实现了。

    三、线程池工作流程

    3.1 线程池的创建

    在Java中,通过ThreadPoolExecutor 实现线程池技术,线程池的工作原理机其实主要就是在说ThreadPoolExecutor的工作流程

    ThreadPoolExecutor是jdk自带的,在java.util.concurrent包中

    ThreadPoolExecutor有四个构造方法,可以提供给我们创建线程池实例。


    实际上四个构造方法中真正被使用的是第四个,也就是有7个参数的那个,其他三个都是通过默认的参数调用的第四个,源码如下

    在第四个构造方法中才是真正创建ThreadPoolExecutor对象的地方

    3.2 线程池参数

    通过上述的源码中的构造函数可以看到,创建线程池一共有7个 参数

    • corePoolSize

      线程池核心线程数最大值

    • maximumPoolSize

      线程池最大线程数大小

    • keepAliveTime

      线程池中非核心线程空闲的存活时间大小

    • unit

      线程空闲存活时间单位

    • workQueue

      存放任务的阻塞队列

    • threadFactory

      用于设置创建线程的工厂,可以给创建的线程设置名字,可方便排查问题。

      一般业务中会有多个线程池,可以根据线程池的名字去定位是哪一个线程出问题

    • handler

      线程池的饱和策略事件,主要有四种类型。

    3.3 线程池执行流程

    • 当提交一个新的任务到线程池时,首先判断线程池中的存活线程数是否小于corePoolSize,如果小于说明有空闲的核心线程,线程池会创建一个核心线程去处理提交的任务。

    • 如果线程池中的存活线程数是已经等于corePoolSize,新提交的任务会被存放进任务队列workQueue排队等待执行。

    • 如果存活线程数已满,并且任务队列workQueue也满了,线程池会判断存活线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没满,则创建一个非核心线程执行提交的任务。

    • 如果当前存活的线程数达到了maximumPoolSize,则针对新的任务直接采用拒绝策略处理

    四、阻塞队列

    当线程池中的活跃线程数达到【核心线程数】的时候,新的任务就会别放在阻塞队列中等待执行

    在ThreadPoolExecutor 的构造函数中的有一个BlockingQueue的参数。常用的阻塞队列都是BlockingQueue接口的实现类,常用的主要有以下几种:

    • ArrayBlockingQueue

      有界队列:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序

    • LinkedBlockingQueue

      可设置容量队列:基于链表结构的阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序

      • 容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE
      • 吞吐量通常要高于ArrayBlockingQuene
      • 静态工厂方法Executors.newFixedThreadPool()线程池使用了这个队列
    • DelayQueue

      延迟队列:任务定时周期的延迟执行的队列,根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序

      • newScheduledThreadPool线程池使用了这个队列
    • PriorityBlockingQueue

      优先级队列:支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排列

    • SynchronousQueue

      同步队列:不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态

      • 吞吐量通常要高于LinkedBlockingQuene
      • newCachedThreadPool线程池使用了这个队列

    五、四种拒绝策略

    拒绝策略也叫饱和策略:当线程池中的活跃线程数达到【最大线程数】的时候,线程池接收到新的任务就会执行拒绝策略

    在ThreadPoolExecutor 的构造函数中的有一个RejectedExecutionHandler的参数。jdk默认有是个实现类,即四种拒绝策略

    • AbortPolicy

      丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。

      使用这个策略必须处理抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。

    • DiscardPolicy

      直接丢弃任务,不做任何处理

    • DiscardOldestPolicy

      丢弃阻塞队列 workQueue 中最老的一个任务,将当前这个任务继续提交给线程池,加入队列中

    • CallerRunsPolicy(交给线程池调用所在的线程进行处理)

      交给线程池调用所在的线程进行处理,即调用线程池的主线程处理

      由于是主线程自己处理,相当于没有用线程池,一般并发比较小,性能要求不高可以用,否则可能导致程序阻塞。

    六、动态调整线程池参数

    在日常开发中,我们可能需要根据实际的业务场景调整线程池的参数,ThreadPoolExecutor针对其构造方法为我们提供了几个方法可以调整常用的参数

    以上参数也可以通过动态配置中心进行动态修改

    线程池中最重要的参数,API以及提供了动态更新的方法。可以配合动态配置中心进行动态修改。

    1. 先对线程池进行监控
    2. 当队列数量过多或过少的时候,以及业务指标的时候,可以动态修改线程池核心参数来调整。

    七、监控线程池

    当我们需要对线程池参数进行调整,而又很难确定 corePoolSize, workQueue,maximumPoolSize等参数达到什么样的大小时才符合业务指标的时候,我们就需要对线程池进行监控

    同样的ThreadPoolExecutor也提供了可以监控线程池的使用情况的几个方法:

    方法含义
    getActiveCount()线程池中正在执行任务的线程数量
    getCompletedTaskCount()线程池已完成的任务数量,该值小于等于taskCount
    getCorePoolSize()线程池的核心线程数量
    getLargestPoolSize()线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
    getMaximumPoolSize()线程池的最大线程数量
    getPoolSize()线程池当前的线程数量
    getTaskCount()线程池已经执行的和未执行的任务总数

    八、向线程池提交任务

    ThreadPoolExecutor提供了两个方法向线程池提交任务,分别为execute()和submit()方法

    • execute

      execute()方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功

       public static void main(String[] args) {
              ThreadPoolExecutor threadPoolExecutor =
                      new ThreadPoolExecutor(10,20,5,
                              TimeUnit.MINUTES,new LinkedBlockingQueue<>() );
              threadPoolExecutor.execute(new Runnable() {
                  @Override
                  public void run() {
                      System.out.println("ThreadPoolExecutorTest 测试----->>>>>子线程执行 : "+ Thread.currentThread().getName());
                  }
              });
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • submit

      submit方法用于提交需要返回值的任务。调用submit方法线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值。

      get()方法会阻塞主线程直到任务完成,而使用get(long timeout,TimeUnit unit),在指定的时间内会等待任务执行,超时则抛出超时异常,等待时候会阻塞当前线程

      public static void main(String[] args) {
          ThreadPoolExecutor threadPoolExecutor =
                  new ThreadPoolExecutor(10,20,5,
                          TimeUnit.MINUTES,new LinkedBlockingQueue<>() );
          threadPoolExecutor.submit(new Runnable() {
              @Override
              public void run() {
                  System.out.println("ThreadPoolExecutorTest 测试----->>>>>子线程执行 : " + Thread.currentThread().getName());
              }
          });
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

      ThreadPoolExecutor中submit有三种实现:

    • submit 为什么能提交任务(Runnable)的同时也能返回任务(Future)的执行结果

      通过追踪submit 三个先实现的源码发现,submit方法最终也是调用的execute方法,但是在调用execute之前还调用了一个newTaskFor 方法

      继续追踪newTaskFor 方法,发现newTaskFor 将 task 封装成了 RunnableFuture,最终返回的是FutureTask 这个类。而FutureTask的继承关系如下

      到这里就清晰了,submit之所以提交任务(Runnable)的同时也能返回任务(Future)的执行结果,主要分为两步:

      • 通过execute方法提交执行任务
      • 通过FutureTask返回任务(Future)的执行结果

    九、关闭线程池

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow()

    这两个的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程

    • shutdown()

      不会立即终止线程池,只是将线程池的状态设置成shutdown状态,此时不会接受新的任务,然后等所有任务z都执行完后才终止(包含正在执行和缓存队列中的任务)

    • shutdownNow()

      立即终止线程池,将线程池的状态设置成STOP,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    可以看到上图中还有一个isShutdown的方法,实际上还有其他两个方法

    • isShutDown

      当调用shutdown()或shutdownNow()方法后返回为true

    • isTerminated

      当调用线程池关闭,并且所有提交的任务都执行完成后返回为true

    • isTerminating

      当调用线程池关闭,并且所有提交的任务都执行完成后返回为false

    因此,判断线程池所有线程是否执行完成,可以这样写:

    while(true){
       if(threadPool.isTerminated()) {
           //当调用线程池关闭,并且所有提交的任务都执行完成  跳出循环
           break;//true停止
       }
       Thread.sleep(500);//休眠500继续循环
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    十、线程池的五种运行状态

    线程有六种状态,线程池同样也有状态,在ThreadPoolExecutor的源码中定义了线程池的五种运行状态,分别是:Running、ShutDown、Stop、Tidying、Terminated

    private static final int RUNNING    = -1 << COUNT_BITS;	// 运行中
    private static final int SHUTDOWN   =  0 << COUNT_BITS;	// 已关闭
    private static final int STOP       =  1 << COUNT_BITS; // 已停止
    private static final int TIDYING    =  2 << COUNT_BITS; // 清洁中
    private static final int TERMINATED =  3 << COUNT_BITS; // 已终止
    
    • 1
    • 2
    • 3
    • 4
    • 5

    • Running

      • 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
      • 状态切换:线程池的初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,且线程池中的任务数为0
    • ShutDown

      • 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
      • 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
    • STOP

      • 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且中断正在处理的任务。
      • 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
    • Tidying

      • 状态说明:当所有的任务已终止,任务数量为0,线程池会变为TIDYING状态。

        当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

      • 状态切换:

        当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。

        当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

    • Terminated

      • 状态说明:线程池彻底终止,就变成TERMINATED状态。
      • 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

    十一、 常用线程池的类型

    Java通过Executors提供六种线程池,分别为:

    • newFixedThreadPool :固定数目线程的线程池
    • newCachedThreadPool:可缓存线程的线程池
    • newSingleThreadExecutor:单个线程的线程池
    • newScheduledThreadPool:定时及周期执行的线程池
    • newSingleThreadScheduledExecutor
    • newWorkStealingPool:足够大小的线程池(JDK 1.8 后新加)

    通过Executors源码可以看到,jdk为我们提供了6种线程池,每种提供了有参和物参两个构造方法。

    11.1 newFixedThreadPool

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。源码如下

       public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 特点

      通过以上源码可以看出newFixedThreadPool具有以下特点

      • 核心线程数和最大线程数大小一样
      • 没有所谓的非空闲时间,即keepAliveTime为0
      • 阻塞队列为无界队列LinkedBlockingQueue
    • 工作机制

      • 当有一个新的任务提交到线程池
      • 如果线程数少于核心线程,创建核心线程执行任务
      • 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
      • 如果线程执行完任务,去阻塞队列取任务,继续执行。
    • 使用场景

      newFixedThreadPool适用于处理CPU密集型的任务,能够保证CPU在工作时尽可能的减少线程分配,即适用执行长期稳定的任务。

    • 案例

      线程池大小为10,每个任务输出index后sleep 1秒,因此每秒会打印10个数字。

      public static void main(String[] args) {
      
          ExecutorService executor = Executors.newFixedThreadPool(10);
      
          for (int i = 0; i < 100; i++) {
              final int index = i;
              executor.execute(() -> {
                  try {
                      System.out.println(index);
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              });
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

    newFixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。同时在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。因此定长线程池的大小一般要根据系统资源进行设置。

    11.2 newCachedThreadPool

    创建一个可缓存线程池,当线程空闲时可灵活回收空闲线程,当有新任务时,没有空闲线程则新建线程。源码如下

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 特点

      • 核心线程数为0
      • 最大线程数为Integer.MAX_VALUE
      • 阻塞队列是SynchronousQueue
      • 非核心线程空闲存活时间为60秒
    • 工作机制

      • 当有一个新的任务提交到线程池
      • 任务直接加到SynchronousQueue队列(没有核心线程)。
      • 判断是否有空闲线程,若有,则取出空闲线程任务执行。若无,则新建线程
      • 执行完任务的线程可以存活60秒,在这期间如果接到任务,则继续存活,否则被销毁。
    • 使用场景

      newCachedThreadPool一般用于并发大,执行时间短的小任务。

    • 案例

      如下案例中线程执行休眠了秒钟,任务的提交速度会大于线程执行的速度

      public static void main(String[] args) {
      
              ExecutorService executor = Executors.newCachedThreadPool();
      
              for (int i = 0; i < 100; i++) {
                  final int index = i;
                  executor.execute(() -> {
                      try {
                          Thread.sleep(2000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println(index);
                  });
              }
          }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17

    当提交任务的速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程。这种情况下由于大量线程同时运行,很有会耗尽 CPU 和内存资源,造成系统OOM。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

    11.3 newSingleThreadExecutor

    创建一个单线程化的线程池,即只创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行,源码如下:

    创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 特点

      • 核心线程数与最大线程数都为1
      • 阻塞队列是LinkedBlockingQueue
      • keepAliveTime为0

      可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活跃的

    • 工作机制

      • 当有一个新的任务提交到线程池
      • 判断线程池是否有一条线程在,若无,则新建线程执行任务,若有,则将任务加入阻塞队列。
      • 当前的唯一线程,从队列取任务,执行完一个再取一个,一个线程串行干所有的活。
    • 使用场景

      适用于串行执行任务的场景,一个任务一个任务地执行。

    • 案例

        public static void main(String[] args) {
      
              ExecutorService executor = Executors.newSingleThreadExecutor();
      
              for (int i = 0; i < 100; i++) {
                  final int index = i;
                  executor.execute(() -> {
                      System.out.println(Thread.currentThread().getName()+"正在执行输出数字:" +  index);
                  });
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

      输出结果如下,从结果可知是同一个线程执行了100个任务

    11.4 newScheduledThreadPool

    创建一个定长的线程池,支持定时及周期性任务执行。源码如下

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 特点

      • 最大线程数为Integer.MAX_VALUE
      • 阻塞队列是DelayedWorkQueue
      • keepAliveTime为0
      • scheduleAtFixedRate() :按某种速率周期执行
      • scheduleWithFixedDelay():在某个延迟后执行
    • 工作机制

      • 当有一个新的任务提交到线程池
      • 线程池中的线程从 DelayQueue 中获取 time 大于等于当前时间的task
      • 执行完后修改这个 task 的 time 为下次被执行的时间
      • 将 task 放回DelayQueue队列中
    • 使用场景

      周期性执行任务的场景,

      需要限制线程数量的场景

    • 案例

      这个线程池的案例分为两种:按某种速率周期执行 和 在某个延迟后执行

      • 在某个延迟后执行

          public static void main(String[] args) {
        
                /**
                 * 创建一个给定初始延迟的间隔性的任务,案例中是2秒
                 * 后面的每次执行时间是上一次任务从执行到结束所需要的时间+给定的间隔时间(2秒)
                 */
                ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
                for (int i = 0; i < 10; i++) {
                    executor.schedule(() -> {
                        System.out.println(Thread.currentThread().getName() + "正在执行,delay 2 seconds");
                    }, 2, TimeUnit.SECONDS);
                }
        
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
      • 按某种速率周期执行

          public static void main(String[] args) {
        
                /**
                 * 创建一个给定初始延迟的间隔性的任务,.
                 * 后面每次任务执行时间为 初始延迟 + N * delay(间隔)
                 */
                ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
                for (int i = 0; i < 10; i++) {
                    executor.scheduleAtFixedRate(() -> {
                        System.out.println("延迟 2 秒,每 4 秒执行一次");
                    }, 2, 4, TimeUnit.SECONDS);
        
                }
        
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15

    11.5 newSingleThreadScheduledExecutor

    通过newSingleThreadScheduledExecutor的源码可以知道,其本质上很newScheduledThreadPool一样,也是通过ScheduledThreadPoolExecutor创建线程池,并且核心线程数为1

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 特点

      • 核心线程数为1
      • 最大线程数为Integer.MAX_VALUE
      • 阻塞队列是DelayedWorkQueue
      • keepAliveTime为0
    • 工作机制

      产生一个线程池大小为1的ScheduledExecutorService对象,当任务多于一个时将按先后顺序执行。

    • 案例

      由于其实现也是ScheduledThreadPoolExecutor对象创建,其案例实现跟newScheduledThreadPool一样的

    11.6 newWorkStealingPool

    newWorkStealingPool即任务窃取线程池:具有抢占式操作的线程池,每个线程都有一个任务队列存放任务

    通过以下源码得知,newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,使用ForkJoinPool的好处是,把1个任务拆分成多个【小任务】,把这些小任务分发到多个线程上执行。这些小任务都执行完成后,再将结果合并

        public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Runtime.getRuntime().availableProcessors()是获取当前系统可以的CPU核心数

    ThreadPoolExecutor和ForkJoinPool都是在统一的一个Executors类中实现

    • 特点

      • 可以传入线程的数量,不传入,则默认使用当前计算机中可用的cpu数量

      • 合理的使用CPU进行对任务操作(并行操作),所以newWorkStealingPool适合使用在很耗时的操作。

    • 工作机制

      newWorkStealingPool默认会以当前机器的CPU处理器个数为线程个数,并行处理任务,且不保证顺序,同时并发数能作为参数设置,WorkStealingPool能够做到并行执行与设置的并发数相同的任务数量,多余的任务则会进入等待队列,等待被执行

    • 案例

       public static void main(String[] args) throws InterruptedException {
              // 线程数
              int threads = 10;
              // 用于计数线程是否执行完成
              CountDownLatch countDownLatch = new CountDownLatch(threads);
              
              ExecutorService executorService = Executors.newWorkStealingPool();
              executorService.execute(() -> {
                  try {
                      System.out.println(Thread.currentThread().getName());
                  } catch (Exception e) {
                      System.out.println(e);
                  } finally {
                      countDownLatch.countDown();
                  }
              });
              countDownLatch.await();
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    十二、为什么不建议使用 Executors静态工厂构建线程池

    Java通过Executors提供六种线程池,我们一般使用线程池也可以直接通过Executors对象new一个线程池,如下

    但是在阿里巴巴Java开发手册,明确指出不允许使用Executors静态工厂构建线程池,这又是为什么呢

    其主要原因是:【规避资源耗尽,清晰线程池的运行规则】。

    • Executors返回的线程池对象的弊端

      • FixedThreadPool 和 SingleThreadPool

        这两个线程池允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM

      • CachedThreadPool 和 ScheduledThreadPool

        这两个线程池允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    • 创建线程池的正确方式

      避免使用Executors创建线程池,主要是避免使用其中的默认实现,我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量,设置最大线程数

      private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
              60L, TimeUnit.SECONDS,
              new ArrayBlockingQueue(10));
      
      • 1
      • 2
      • 3

      创建线程池也可以使用开源类库:开源类库,如apache和guava等。

  • 相关阅读:
    java集合类史上最细讲解 - Collections工具类
    聚观早报 | 苹果秋季发布会定档9月7日;​饿了么与抖音达成合作
    java 多线程&线程状态的切换——67
    【OpenCV实现图像:用OpenCV图像处理技巧之白平衡算法2】
    [附源码]Python计算机毕业设计SSM浪漫烘焙屋(程序+LW)
    江西服装学院图书馆藏《乡村振兴战略下传统村落文化旅游设计》许少辉八一新著
    Kerberos (二) --------- Hadoop Kerberos 配置
    远程监控高并发高吞吐java进程
    [附源码]java毕业设计电子病历信息管理系统
    【编程题】【Scratch二级】2021.03 寻找宝石
  • 原文地址:https://blog.csdn.net/Edwin_Hu/article/details/126214214