• Fork/Join实战和原理分析


    代码案例

    package thread;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoinExample {
        //针对一个数字,做计算
        private static final Integer MAX = 200;
    
        static class CalcForJoinTask extends RecursiveTask<Integer>{
    
            private Integer startValue; //子任务开始计算的值
            private Integer endValue;   //子任务结束计算的值
    
            public CalcForJoinTask(Integer startValue, Integer endValue) {
                this.startValue = startValue;
                this.endValue = endValue;
            }
    
            //运算过程
            @Override
            protected Integer compute() {
                // 如果条件成立,说明这个任务所需要计算的数值拆分得足够小了,不需要再拆分可以正式进行累加计算了
                if(endValue - startValue < MAX){
                    System.out.println("开始计算:startValue:"+startValue+";endValue:"+endValue);
                    Integer totalValue = 0;
                    for (int i = this.startValue; i <= this.endValue;i++){
                       totalValue += i;
                    }
                    return totalValue;
                }
                //否则,对数字进行拆分,拆分成两个任务计算
                CalcForJoinTask subTask = new CalcForJoinTask(startValue,(startValue+endValue)/2);
                subTask.fork();
                CalcForJoinTask calcForJoinTask = new CalcForJoinTask((startValue+endValue)/2+1,endValue);
                calcForJoinTask.fork();
                return subTask.join() + calcForJoinTask.join();
            }
        }
    
        public static void main(String[] args) {
            //通过ForkJoinPool来执行ForkJoinTask
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Integer> taskFuture = pool.submit(new CalcForJoinTask(1,10000));
            try {
                Integer result =  taskFuture.get();
                System.out.println("result:"+result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    工作流程图

    fork/join的原理,可以通过一个图形来理解。整体思想其实就是拆分与合并。也就是分而治之。
    在这里插入图片描述
    图中最顶层的任务使用submit方式被提交到Fork/Join框架中,Fork/Join把这个任务放入到某个线程中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。如果当前任务需要累加的数字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个子任务各自负责计算一半的数据累加,请参见代码中的fork方法。如果当前子任务中需要累加的数字范围足够小(小于等于200),就进行累加然后返回到上层任务中。

    Fork/Join API代码分析

    上面的例子中涉及到几个重要的API, ForkJoinTask ,ForkJoinPool 。
    ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join操作,常用的三个子类

    • RecursiveAction : 无结果返回的任务
    • RecursiveTask : 有返回结果的任务
    • CountedCompleter :无返回值任务,完成任务后可以触发回调

    ForkJoinTask提供了两个重要的方法:

    • fork : 让task异步执行
    • join : 让task同步执行,可以获取返回值

    ForkJoinPool :专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务)

    方法名说明
    invoke(ForkJoinTask t)提交任务并一直阻塞直到任务执行完成返回合并结果。
    execute(ForkJoinTask t)异步执行任务,无返回值。
    submit(ForkJoinTask t)异步执行任务,返回task本身,可以通过task.get()方法获取合并之后结果。

    ForkJoinTask 在不显示使用 ForkJoinPool.execute/invoke/submit() 方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。

    工作队列

    • 双端队列
    • 工作窃取
    public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    每次调用fork异步执行任务的时候,都会把任务本身放到workQueue双端队列(后进先出)里面,而且是线程私有的,等于是每一个线程里面都有一个双端队列。执行的时候也是每次从双端队列取出一个任务执行计算依次进行。
    在这里插入图片描述
    工作窃取的意思就是 ForkJoinWorkerThread-1和 ForkJoinWorkerThread-2两个双端队列,执行任务有快有慢,肯定有先执行完了了,比如ForkJoinWorkerThread-2先执行完所有的任务了,ForkJoinWorkerThread-1还有很多任务没有执行,ForkJoinWorkerThread-1是从队头取任务执行的,那么ForkJoinWorkerThread-2是不是可以从ForkJoinWorkerThread-1双端队列的队尾窃取任务到ForkJoinWorkerThread-2进行执行。(其实双端队列也可以减少线程的竞争,降低资源消耗)

  • 相关阅读:
    元服务那些事儿 | 舞刀解决隐私声明,斩断上架牵绊
    《大数据面试通关》(第十四讲)——10 大业务场景 500 个离线实时指标
    如何在Ubuntu 18.04上安装Go并设置本地编程环境
    多标签分类损失函数/精度 BCEWithLogitsLoss MultiLabelSoftMarginLoss BCELoss
    ns3-gym入门(三):在opengym基础上实现一个小小的demo
    IDC Incast 的不彻底解决
    Tomcat部署与优化
    使用Redis将单机登录改为分布式登录
    classification_report
    共享内存 - 多进程编程(三)
  • 原文地址:https://blog.csdn.net/xiaowanzi_zj/article/details/125610955