• 11 Fork/Join


    目录

    1 分治思想

    2 Fork/Join

    2.1 介绍

     2.2 应用场景

    1 递归分解型任务

    2 数组处理

    3 并行化算法

    4 大数据处理

    2.3 使用

    2.3.1 ForkJoinPool

            构造器

            任务提交方式

             与普通线程池对比

    2.3.2 ForkJoinTask

            调用方法

    2.3.3 处理递归任务

    2.3.4 处理阻塞任务

    2.4 原理

    2.4.1 工作线程

    2.4.2 工作窃取


    1 分治思想

            分治思想:规模为N的问题分解为K个规模的子问题,子问题相互独立且与原问题性质相同,求出子问题的解,就能得到原问题的解

            分治思想的步骤:

                    分解

                    求解

                    合并

    2 Fork/Join

    2.1 介绍

            并行计算框架,用来支持分治任务模型的,Fork对应的是分治任务模型里的任务分解Join对应的是结果合并

     2.2 应用场景

    1 递归分解型任务

            排序、归并、遍历等,通常可以将大的任务分解成若干子任务

    2 数组处理

            大型数组的排序、查找、统计等,拆成若干子数组,并行地处理每个子数组,最后合并成一个大的有序数组

    3 并行化算法

            并行化图像处理算法、并行化机器学习算法等,将任务拆分成若干子问题

    4 大数据处理

            大型日志文件处理、大型数据库查询等,将数据分成若干分片,并行处理每个分片

    2.3 使用

    主要组成:ForkJoinPool、ForkJoinTask

            ForkJoinPool:用于管理任务的执行

            ForkJoinTask:任务可以被分为得更小

    使用步骤:

            1 构建一个任务,需要继承RecursiveAction(无返回值)或RecursiveTask(有返回值),重写compute()方法来实现任务的执行逻辑,在方法中最后调用invokeAll开始执行任务

            2 构建forkJoin线程池,调用forkJoin.invoke()来提交任务

    2.3.1 ForkJoinPool

            用于管理Fork/Join任务的线程

            构造器

                    

                    int parallelism:指定并行级别,决定工作线程的数量,未设置则使用Runtime.getRuntime().availableProcessors()来设置并行级别

                    ForkJoinWorkerThreadFactory:在创建线程时,通过该factory创建,未设置使用默认的DefaultForkJoinWorkerThreadFactory负责线程的创建

                    UncaughtExceptionHandler:指定异常处理器,运气出错时会由设定的处理器处理

                    asyncMode:队列的工作模式,true=先进先出,false=后进先出

            任务提交方式

                    

             与普通线程池对比

                    工作窃取算法:普通线程池采用任务队列实现;FockJoinPool中的线程在执行完任务后,可以从其它线程的队列获取任务并执行

                    任务的分解和合并:ForkJoinPool可将一个大任务分解为多个小任务,并行地执行这些小任务,最终将其结果合并;而普通线程只能按提交的任务顺序一个一个地执行

                    工作线程的数量:ForkJoinPool根据当前系统的CPU核心数来自动设置工作线程的数量,以最大限度地发挥CPU性能优势;普通线程需要手动设置线程池大小,且要考虑其合理性

                    任务类型:ForkJoinPool适用于执行大规模任务并行化;普通线程池适用于执行一些短小的任务,如处理请求

    2.3.2 ForkJoinTask

            定义执行任务的基本接口

            通过继承ForkJoinTask类来实现自己的任务类,重写其中的compute()方法来定义任务的执行逻辑,实现时只需继承其子类:

                    RecursiveAction:递归执行但不需要返回结果

                    RecursiveTask:递归执行需要返回的结果

                    CountedCompleter:任务完成执行后,触发的自定义钩子函数

            调用方法

                    fork() ---- 提交任务

                            向当前任务所运行的线程池中提交任务;当前线程是ForkJoinWorkerThread类型,会放入该线程的工作队列,否则放入common线程池的工作队列中

                    join() ---- 获取任务执行结果

                            用于获取任务的执行结果;调用方法时,会阻塞当前线程直到对应的子任务完成运行并返回结果

    2.3.3 处理递归任务

    1. public class Fibonacci extends RecursiveTask {
    2. final int n;
    3. public Fibonacci(int n) {
    4. this.n = n;
    5. }
    6. @Override
    7. protected Integer compute() {
    8. if (n <= 1) {
    9. return n;
    10. }
    11. Fibonacci f1 = new Fibonacci(n - 2);
    12. f1.fork();
    13. Fibonacci f2 = new Fibonacci(n - 1);
    14. return f2.compute() + f1.join();
    15. }
    16. public static void main(String[] args) {
    17. ForkJoinPool pool = new ForkJoinPool();
    18. Fibonacci fibonacci = new Fibonacci(100);
    19. Integer result = pool.invoke(fibonacci);
    20. System.out.println(result);
    21. }
    22. }

            上述代码存在的问题:会导致程序运行时间长,递归深度过大时栈溢出等

            在使用ForkJoinPool处理递归任务时,特别要考虑递归深度和任务粒度,避免调度带来的内存消耗

    2.3.4 处理阻塞任务

            1 防止线程饥饿:当一个线程在执行阻塞任务时,会一直等待任务完成,这时如果没有其它线程能够窃取任务,那么线程将会一直阻塞;在ForkJoinPool中要避免提交大量阻塞任务(使用该线程池的目的是分治,大量阻塞任务会降低ForkJoinPool的使用价值和性能

            2 使用特定线程池:ThreadPoolExecutor,将其作为ForkJoinPool的执行器,让其执行阻塞任务;ForkJoinPool执行非阻塞任务

            3 不要阻塞工作线程:如果说一定要提交阻塞任务,确保任务不阻塞工作线程,否则会导致整个线程池性能下降,为了避免该情况,需要将阻塞任务提交到一个新的线程池中,或使用CompletableFuture等异步编程工具来处理阻塞任务

    1. public class BlockingTaskDemo {
    2. public static void main(String[] args) {
    3. ForkJoinPool pool = new ForkJoinPool();
    4. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    5. try {
    6. Thread.sleep(5);
    7. return "xxx";
    8. } catch (InterruptedException e) {
    9. return null;
    10. }
    11. }, pool);
    12. try {
    13. String result = future.get();
    14. System.out.println(result);
    15. } catch (Exception e) {
    16. pool.shutdown();
    17. }
    18. }
    19. }

    2.4 原理

            ForkJoinPool内部有多个任务队列(一个工作线程对应一个工作队列),当调用invoke、submit方法提交任务时,会根据一定路由规则将任务提交到任务队列中,任务执行过程中创建了子任务,则将子任务也提交到任务队列

            即使线程任务队列空了,也能通过“任务窃取”机制,去获得其它线程的任务并执行

    2.4.1 工作线程

            ForkJoinWorkerThread是ForkJoinPool中专门用于执行的线程,当该线程被创建时,会注册一个WorkQueue到ForkJoinPool中,是该线程专门用来存储自己任务的队列;被存储在WorkQueue[]的奇数位

            WorkQueue[]数组用于存储所有线程的WorkQueue;偶数位置存放外部线程提交的任务(这也是为什么CompletableFuture能够放,ForkJoinPool线程池的原因,偶数位置的WorkQueue[]保存线程池的一般逻辑)

            在ForkJoinPool中,WorkQueue[]奇数位才属于ForkJoinWorkerThread,窃取任务也是在该位置上窃取

    2.4.2 工作窃取

            允许空闲线程从WorkQueue[]的奇数位获取任务

            push、pop:当前线程操作;poll:窃取其它线程的WorkQueue

  • 相关阅读:
    Linux系统函数之文件系统管理
    【开源】串口/蓝牙/TCP/UDP调试工具SerialTest
    Java数据结构——第十二节 - Map和Set
    在使用npm安装插件时,npm报错ERESOLVE
    客观看待mybatis 中使用 where 1=1
    C#NET6基于MailKit 进行邮件发送通知
    数据结构(Java):顺序表&集合类ArrayList
    svg学习
    安装oem 13c
    附下载 | 图解密评联委会《商用密码应用安全性评估FAQ(第二版)》
  • 原文地址:https://blog.csdn.net/m0_61253315/article/details/134003740