• Java多线程开发系列之六:无限分解流----Fork/Join框架


    Fork译为拆分,Join译为合并
    Fork/Join框架的思路是把一个非常巨大的任务,拆分成若然的小任务,再由小任务继续拆解。直至达到一个相对合理的任务粒度。然后执行获得结果,然后将这些小任务的结果汇总,生成大任务的结果,
    直至汇总成最初巨大任务的结果。如下图:

    红色箭头代表拆分子任务。
    绿色箭头代表返回子任务结果
    这个框架的思路听起来,其实用传统的线程池、多线程完全就可以解决。但是内部却有很多小的细节(后边会说到),再加上清晰的使用思路,让这个框架还是在多线程并发中,占有了一席之地。
    Fork/Join框架下,我们常用到三个类:(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )
    RecursiveAction,子任务类,支持子任务有返回结果任务
    RecursiveTask,子任务类,用于有返回结果的任务
    ForkJoinPool,执行子任务的线程池。
    话不多说,我们直接看代码:

    复制代码
     1 public class SumDemo extends RecursiveTask {
     2 
     3     int maxLen = 800_0000;
     4 
     5     int[] arr;
     6     int start;
     7     int end;
     8 
     9 
    10     public SumDemo(int[] arr, int start, int end) {
    11         this.arr = arr;
    12         this.start = start;
    13         this.end = end;
    14     }
    15 
    16     @Override
    17     protected Long compute() {
    18         if (end - start < maxLen) {
    19             long a = sum();
    20             try {
    21                 //Thread.sleep(1);
    22             } catch (Exception e) {
    23             }
    24             return a;
    25         }
    26         int middle = (start + end) / 2;
    27         SumDemo left = new SumDemo(arr, start, middle);
    28         SumDemo right = new SumDemo(arr, middle + 1, end);
    29         left.fork();
    30         right.fork();
    31         //invokeAll(left,right);
    32         long leftRtn = left.join();
    33         long rightRtn = right.join();
    34         return leftRtn + rightRtn;
    35     }
    36 
    37     private Long sum() {
    38         System.out.println("now" + Thread.currentThread().getName() + "-start:" + start + "-end:" + end);
    39         long sum = 0;
    40         for (int i = start; i <= end; i++) {
    41             sum += arr[i];
    42         }
    43         return sum;
    44     }
    45 
    46     public static void main(String[] args) throws ExecutionException, InterruptedException {
    47         int size = 30000_0000;
    48         int[] arr = new int[size];
    49         Random random = new Random(0);
    50         for (int i = 0; i < size; i++) {
    51             arr[i] = random.nextInt(10_0000_0000);
    52         }
    53         long cal = 0;
    54         long start = System.currentTimeMillis();
    55         for (int i = 0; i < size; i++) {
    56             if (i % 800_0000 == 0) {
    57                 Thread.sleep(1);
    58             }
    59             cal += arr[i];
    60         }
    61         long finish = System.currentTimeMillis();
    62         long timeCost = finish - start;
    63         System.out.println("cal" + cal);
    64         long start1 = System.currentTimeMillis();
    65         ForkJoinPool forkJoinPool = new ForkJoinPool();
    66         ForkJoinTask result = forkJoinPool.submit(new
    67                 SumDemo(arr, 0, size - 1));
    68         long rtn = result.get();
    69         long finish1 = System.currentTimeMillis();
    70         long forkJoinCost = finish1 - start1;
    71         System.out.println("one thread  cost" + (timeCost));
    72         System.out.println("fork join cost" + forkJoinCost);
    73     }
    74 }
    复制代码

    执行的结果大概是这样的

    复制代码
     1 cal150000314007254036
     2 nowForkJoinPool-1-worker-1-start:0-end:4687499
     3 nowForkJoinPool-1-worker-3-start:187500000-end:192187499
     4 nowForkJoinPool-1-worker-5-start:37500000-end:42187499
     5 nowForkJoinPool-1-worker-6-start:225000000-end:229687499
     6 .....
     7 nowForkJoinPool-1-worker-3-start:220312500-end:224999999
     8 nowForkJoinPool-1-worker-7-start:267187500-end:271874999
     9 nowForkJoinPool-1-worker-2-start:107812500-end:112499999
    10 nowForkJoinPool-1-worker-4-start:281250000-end:285937499
    11 nowForkJoinPool-1-worker-7-start:271875000-end:276562499
    12 nowForkJoinPool-1-worker-5-start:135937500-end:140624999
    13 nowForkJoinPool-1-worker-11-start:140625000-end:145312499
    14 nowForkJoinPool-1-worker-6-start:276562500-end:281249999
    15 nowForkJoinPool-1-worker-4-start:285937500-end:290624999
    16 nowForkJoinPool-1-worker-11-start:145312500-end:149999999
    17 nowForkJoinPool-1-worker-7-start:290625000-end:295312499
    18 nowForkJoinPool-1-worker-4-start:295312500-end:299999999
    19 one thread cost136
    20 fork join cost67
    复制代码

    线程池默认大小是根据cpu当前的可用核数来作为大小的,我们这里是12核,但是12核居然只比单一线程用时少50%,这是挺奇怪的,这主要是由于我们Demo中的任务是连续的计算密集型任务,这种情况下单一线程的表现也很优秀,forkJoin反而由于要不断协调线程

    任务而导致会损耗性能,所以差距并不明显。倘若放开注释中的睡眠时间,则两者的差距会拉开的非常大,如下:

    1 one thread  cost675
    2 fork join cost194

    代码的思路大概是这样的:

    我们先定义一个子任务类,子任务类设置一个阈值,子任务开始任务时会判断:
    如果计算量未超过阈值呢,说明任务足够小,我们当前子任务直接就执行计算了。
    如果计算量超过阈值,说明任务比较大我们需要进行拆分,此时创建好拆分子任务,并使用fork()方法即可。拆分后的子任务,则后续使用join等待结果即可。
    这样通过Fork/Join框架实现大任务的计算就算是搞定了。(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )

    那既然是线程池,是如何协调线程来计算子任务的呢?

    (1)与传统线程池共享一个任务队列不同的是,Fork/Join框架中,每个子任务都有一个属于自己线程的任务队列(但是两者其实并不是一对一的关系,源码很复杂),如下图:

    这样肯定会由于任务规模、计算难度的不同,导致有些线程很快执行完了,其它线程还有很长的任务队列,那怎么办呢?
    Fork/Join框架会让任务已经完成的线程,从其它任务的队列的尾端去取任务,这样一方面加速了任务的完成,一方面又减少了线程由于并发操作队列可能存在的并发问题。
    这种方式,我们也将它称为“工作窃取”如下图:

    (2)Fork出来的子任务被谁执行了:
    通过阅读源码我们可以发现,如果当前线程是线程池线程,则直接把fork出的子任务丢到当前线程的队列中,否则会通过计算随机的提交到其他的线程所拥有的的队列中。由其他线程来完成。

    复制代码
    1     public final ForkJoinTask fork() {
    2         Thread t;
    3         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    4             ((ForkJoinWorkerThread)t).workQueue.push(this);
    5         else
    6             ForkJoinPool.common.externalPush(this);
    7         return this;
    8     }
    复制代码

     

  • 相关阅读:
    十二、SpringBoot文件上传使用及流程分析【文件上传参数解析器】
    电子沙盘数字沙盘大数据人工智能开发教程第16课
    学习Vue的入门方法有哪些呢?
    【java】JVM类加载机制
    Java设计模式8,校验、审批流程改善神器,责任链模式
    Android Jetpack简介
    用免费GPU部署自己的stable-diffusion-学习笔记
    P06 DDL
    java毕业生设计学生选课咨询系统计算机源码+系统+mysql+调试部署+lw
    PowerDesigner 16 导入表结构与生成 Html
  • 原文地址:https://www.cnblogs.com/jilodream/p/17480266.html