• 第十四章《多线程》第8节:线程池


    系统启动一个线程的成本是比较高的,因为启动线程的操作要与操作系统交互。如果程序中需要创建大量生存期较短的线程,那么使用线程池将会大幅度提高程序的运行效率。线程池中保存了一定数目可重复使用的线程,因此可以在使用时直接从线程池中获得一个线程,用完之后还可以把线程放回线程池以便后面再次使用。

    线程池在系统启动时就创建大量线程,程序将一个Runnable或Callable实现类对象传递给线程池,线程池就会启动一个空闲线程来执行它们的run()方法或call()方法。当run()方法或call()方法执行完之后,线程并不会死亡,而是再次返回线程池中等待执行下一个对象的run()方法或call()方法。

    使用线程池还有一个好处就是能够控制系统中并发线程的数量。当系统包含大量并发线程时,会导致系统性能下降,而线程池的最大线程数参数控制系统中并发线程不会超过设定值。

    14.8.1使用Executors产生线程池

    Java语言使用ExecutorService和ScheduledExecutorService这两个接口来表示线程池,而使用Executors类生成线程池。Executors位于java.util.concurrent包下,这个类提供了一些静态方法来产生线程池,这些方法如表14-6所示。

    表14-6 Executors类的方法

    方法

    功能

    newCachedThreadPool()

    创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中

    newFixedThreadPool(int nThreads)

    创建一个可重用的、具有固定线程数的线程池

    newSingleThreadExecutor()

    创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为1

    newScheduledThreadPool(int corePoolSize)

    创建具有指定线程数的线程池,它可以在指定的时长后执行线程任务

    newSingleThreadScheduledExecutor()

    创建只有一个线程的线程池,它可以在指定的时长后后执行线程任务

    newWorkStealingPool(int parallelism)

    创建持有足够线程的线程池来支持给定的并行级别

    newWorkStealingPool()

    该方法是前一个方法的简化版本,相当于为前一个方法传入4作为参数

    以上这些方法中,newScheduledThreadPool()newSingleThreadScheduledExecutor()这两个方法的返回值是ScheduledExecutorService类型,其他方法的返回值是ExecutorService类型。

    ExecutorService代表尽快执行的线程池,也就是说只要把代表任务的Runnable对象或Callable对象交给线程池,线程池就会尽快执行该任务。ExecutorService提供了三个版本的submit()方法用于把任务提交给线程池,如表14-7所示。

    表14-7 ExecutorService提交任务的方法

    编号

    方法

    1

    Future submit(Runnable task)

    2

    Future submit(Runnable task, T result)

    3

    Future submit(Callable task)

    1号版本的submit()方法作用是是将task提交给线程池,线程池会尽快执行任务。这个方法的返回值类型是Future,程序员可以用这个Future类型的返回值获得线程执行的返回值。由于run()方法没有返回值,所以用Future的get()方法所获得的返回值为null,但能用Future的isDone()和isCancelled()方法得到线程的执行状态。

    2号版本的submit()方法与前一个版本作用一样,区别只在于程序员可以把Future的返回值显示的指定为result。

    3号版本的submit()方法用于把一个Callable对象提交给线程池,线程池会尽快执行任务,而Future的get()方法能够获得call()方法的返回值。

    与ExecutorService不同,ScheduledExecutorService表示可以在指定延迟后或周期性执行任务的线程池,它提供了如表14-8所示的4个方法用于向线程池提交任务。

    表14-8 ScheduledExecutorService提交任务的方法

    编号

    方法

    1

    ScheduledFuture schedule(Runnable command,long delay, TimeUnit unit)

    2

    ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit)

    3

    ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,                                              long period, TimeUnit unit)

    4

    ScheduledFuture scheduleWithFixedDelay(Runnable command,long initialDelay,

    long delay, TimeUnit unit)

    以上这些方法中,1号方法是让command任务在指定的delay延迟后执行。2号方法是让callable任务在指定的delay延迟后执行。3号方法是让command任务在指定的delay延迟后执行,并且以period为周期重复执行。4号方法是让command任务在指定的delay延迟后执行,并且以period为周期重复执行,每两次执行任务之间也会有delay时间长度的间隔。

    用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭操作。调用shutdown()方法后的线程池不再接收新任务,但会将以前所有已提交任务执行完成。当线程池中的所有任务都执行完成后,池中的所有线程都会死亡,另外也可以调用线程池的shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

    使用线程池来执行线程任务的步骤如下。

    1. 调用Executors类的静态工厂方法创建一个ExecutorService对象,该对象代表一个线程池
    2. 创建Runnable实现类或Callable实现类的实例,作为线程执行任务
    3. 调用ExecutorService对象的submit()方法来提交Runnable实例或Callable实例
    4. 当不想提交任何任务时,调用ExecutorService对象的shutdown()方法来关闭线程池

    下面的【例14_17】展示了如何使用线程池处理任务。

    【例14_17 线程池】

    Exam14_17.java

    1. import java.util.concurrent.*;
    2. class Task implements Runnable{
    3. @Override
    4. public void run() {
    5. try {
    6. for (int i=1;i<=5;i++){
    7. System.out.println(Thread.currentThread().getName()+":"+i);
    8. Thread.sleep(200);
    9. }
    10. }catch (InterruptedException e){
    11. e.printStackTrace();
    12. }
    13. }
    14. }
    15. public class Exam14_17 {
    16. public static void main(String[] args){
    17. ExecutorService pool = Executors.newFixedThreadPool(5);
    18. Task task1 = new Task();
    19. Task task2 = new Task();
    20. pool.submit(task1);//把任务提交给线程池
    21. pool.submit(task2);//把任务提交给线程池
    22. pool.shutdown();//①关闭线程池
    23. }
    24. }

    【例14_17】的main()方法中,把两个任务task1和task2提交给线程池,线程池就会立刻分配线程执行这两个任务,之后关闭线程池。【例14_17】的运行结果如图14-16所示。

    图14-16【例14_17】运行结果

    从图14-16可以看出:语句①在两个任务刚被提交给线程池的时候就调用shutdown()方法关闭线程池,但线程池中的线程仍然顺利的执行了两个任务。这是因为shutdown()方法的作用并不是停止线程的执行,而是“拒绝”新的任务进入池内。当线程池中的线程开始执行任务后,即使调用了shutdown()方法,线程池中的线程也不会中止执行,线程会全部执行完毕。

    14.8.2 FolkJoinPool线程池

    当今的计算机已经向着“多核化”的方向发展,很多计算机都有不止一个CPU。如果要以最快的速度执行完任务,最好的方式就是把一个任务分解成多个小任务分配给多个CPU执行,然后把每个小任务的执行结果组合成整个任务的执行结果。

    Java语言中,ForkJoinPool就是一种能把一个任务分解成多个小任务的线程池。ForkJoinPool是ExecutorService接口的实现类,它有以下两个构造方法:

    1. public ForkJoinPool(int parallelism)
    2. public ForkJoinPool()

    第一个构造方法创建一个包含parallelism个并行线程的线程池,而第二个构造方法也是创建一个线程池,它的并行线程数量是Runtime.getRuntime().availableProcessors()的返回值,这个返回值实际上就是系统逻辑CUP的数量。从JDK1.8开始,ForkJoinPool又增加了一个commonPool()静态方法来获得ForkJoinPool对象,通过这个方法获得的ForkJoinPool对象被称为通用池。通用池的运行状态不会受shutdown()或shutdownNow()方法的影响。但如果程序员调用System.exit(0)来终止虚拟机,通用池以及通用池中正在执行的任务都会被自动终止。

    创建了ForkJoinPool 实例之后,就可调用ForkJoinPool 的submit()或 invoke()方法来执行指定的任务了。这两个方法的参数类型都是ForkJoinTask,ForkJoinTask代表一个可以并行、合并的任务,它的fork()方法用于将新创建的子任务放入当前线程的工作队列中。ForkJoinTask是一个抽象类,并且它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,而RecursiveAction代表没有返回值的任务,下面的【例14_18】演示了如何把一个大任务拆分成小任务并交给ForkJoinPool线程池执行。

    【例14_18 ForkJoinPool线程池1】

    Exam14_18.java

    1. import java.util.concurrent.*;
    2. class PrintTask extends RecursiveAction{//可拆解的任务
    3. private static int THRESHOLD = 50;//每个小任务最多打印50个数字
    4. private int start;//要打印的第一个数字
    5. private int end;
    6. public PrintTask(int start,int end){
    7. this.start = start;
    8. this.end = end;
    9. }
    10. @Override
    11. protected void compute() {
    12. if(end-start//要打印的数字量小于等于50
    13. for (int i=start;i
    14. System.out.println(Thread.currentThread().getName()+":"+i);
    15. }
    16. }else{//要打印的数字量大于50
    17. //拆解任务
    18. int middle = (start+end)/2;
    19. PrintTask left = new PrintTask(start,middle);
    20. PrintTask right = new PrintTask(middle,end);
    21. //把两个拆分出的小任务放入工作队列
    22. left.fork();
    23. right.fork();
    24. }
    25. }
    26. }
    27. public class Exam14_18 {
    28. public static void main(String[] args) {
    29. ForkJoinPool pool = new ForkJoinPool();
    30. PrintTask task = new PrintTask(0,300);
    31. pool.submit(task);//把任务提交给线程池
    32. try {
    33. pool.awaitTermination(2, TimeUnit.SECONDS);//①等待2秒钟
    34. }catch (InterruptedException e){
    35. e.printStackTrace();
    36. }
    37. pool.shutdown();
    38. }
    39. }

    【例14_18】中,PrintTask表示一个可拆分的任务,它负责打印[start,end)这个区间内的整数。如果要打印的数字超过50个,则把任务拆分成两小任务,每个小任务各打印一半,如果每个小任务要打印的数字仍然超过50个,则继续拆分,直到任务小到打印数字不超过50个为止。需要注意:ForkJoinPool表示的线程池在调用shutdown()方法后会立刻结束池中线程的运行,因此语句①调用sleep()方法等待了2秒钟以保证池中线程全部执行完毕。【例14_18】的运行结果图14-17所示。

    图14-17【例14_18】运行结果

    由于运行结果很长,图14-17仅展示了运行结果的一部分。从图14-7可以看出:线程池启动了4个线程来完成这个任务,线程数量之所以是4,是因为线程数量一般与逻辑CPU数量相等,而逻辑CPU又是物理CPU数量的2倍。此外还可以看出:虽然打印了0~299这300个数字,但并不是按顺序连续打印的,这恰好证明了多个线程是并行执行的。

    【例14_18】中的任务没有返回值,如果一个任务是有返回值的,可以让任务类继承RecursiveTask,这个类是一个泛型类,类型参数就是返回值的类型。下面的【例14_19】展示了使用RecursiveTask类定义一个任务,这个任务要计算1~100累加和。

    【例14_19 ForkJoinPool线程池2】

    Exam14_19.java

    1. import java.util.concurrent.*;
    2. class AddTask extends RecursiveTask {//可拆解的任务
    3. private static int THRESHOLD = 20;//每个小任务最多累加20个数字
    4. private int start;
    5. private int end;
    6. public AddTask(int start,int end){
    7. this.start = start;
    8. this.end = end;
    9. }
    10. @Override
    11. protected Integer compute() {
    12. int sum = 0;
    13. if(end-start
    14. for (int i=start;i
    15. sum = sum + i;
    16. }
    17. return sum;
    18. }else{
    19. //拆解任务
    20. int middle = (start+end)/2;
    21. AddTask left = new AddTask(start,middle);
    22. AddTask right = new AddTask(middle,end);
    23. //把两个拆分出的小任务放入工作队列
    24. left.fork();
    25. right.fork();
    26. return left.join()+right.join();//把两个任务的累加和加起来
    27. }
    28. }
    29. }
    30. public class Exam14_19 {
    31. public static void main(String[] args) {
    32. //创建通用池
    33. ForkJoinPool pool = ForkJoinPool.commonPool();
    34. AddTask task = new AddTask(1,101);//①
    35. //把任务提交到线程池,并把运行结果保存到future中
    36. Future future = pool.submit(task);
    37. try {
    38. System.out.println("累加结果为:"+future.get());
    39. } catch (Exception e) {
    40. e.printStackTrace();
    41. }
    42. pool.shutdown();//关闭线程池
    43. }
    44. }

    【例14_19】与【例14_18】很相似,任务只是把打印数字改成了累加数字。需要注意:compute()方法在进行累加时不会把end这个数字加进去,所以语句①在给构造方法传递参数时第二个参数的值是101而不是100。此外,本例中的pool对象是一个通用池,因此在线程运行结束前调用shutdown()方法并不会终止池中线程的运行。【例14_19】的运行结果如图14-18所示。

    图14-18【例14_19】运行结果

    除阅读文章外,各位小伙伴还可以点击这里观看我在本站的视频课程学习Java!

  • 相关阅读:
    WhisperFusion:具有超低延迟无缝对话功能的AI系统
    AGI 远不止 ChatGPT!一文入门 AGI 通识及应用开发
    c++实验1
    GO 语言的并发编程相关知识点简介与测试【GO 基础】
    五月集训(第二十三日)字典树
    WSL Ubuntu20.04安装pycairo指南
    [机缘参悟-83]:如何自我前提应对可能的经-济-危-机?
    PHP:比较运算符
    代码随想录 -- 哈希表--两数之和
    论文解析[4] UNET 3+: A FULL-SCALE CONNECTED UNET FOR MEDICAL IMAGE SEGMENTATION
  • 原文地址:https://blog.csdn.net/shalimu/article/details/128111356