• java之Fork/Join框架


    前言

    Fork/Join框架是java7提供的一个用于执行并行任务的框架,是一个把大部分任务分割成若干个小任务,最终汇总每个小任务结果后,得到大任务结果的框架。

    工作窃取算法

    工作窃取算法是指某个线程从其他队列里窃取任务来执行。为什么要使用这个工作窃取算法呢。首先Fork/Join框架是把一个大任务分为了好多个子任务,然后把这些子任务,交给不同的线程去执行。这个时候就会有一个问题。不同的线程,执行的速度肯定是不一样的,有的线程执行的快,有的线程执行的慢,这个就会导致有的线程任务已经执行完了,但是有的线程,还会有很多的任务。但是执行完任务的线程,也不能空着啊,不然就是浪费效率。所以,这个时候就会用到工作窃取算法,做完任务的线程,会去没做完任务的线程那里,窃取一些线程,来进行工作。如下图,是工作窃取算法的运行流程图。
    在这里插入图片描述
    线程1为做完任务的线程,它窃取线程2的线程的时候,是从尾部窃取的。线程2 是被窃取的线程,它是从头部取线程的。

    工作窃取算法的缺点:在某些情况下还是存在竞争的,比如在双端队列里只有一个任务的时候。并且该线程消耗了更多的系统资源,比如创建了多个线程和多个双端队列。

    Fork/Join框架的设计

    Fork/Join框架的设计总体大致分为两个步骤
    1.分割任务。首先需要一个fork类把大任务分割为子任务,有可能子任务还是很大,所以需要不停的分割,直到分割出的子任务足够小。
    2.执行任务并合并结果: 分割的子任务放在双端队列里面,然后创建多线程去双端队列里面拿任务执行。子任务的结果统一放在一个队列里面,启动一个线程从队列里面拿数据,然后合并这些数据。

    Fork/Join框架的异常处理

    ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法捕获异常。

    Fork/Join框架的实现原理

    ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成。ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

    1.ForkJoinTask的fork方法实现原理
    当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的push方法异步的执行这个任务(就是把任务都放到队列里面),然后立即返回结果。代码如下:

      public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    push方法会把当前任务存放在ForkJoinTask数组队列里。然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。

    final void push(ForkJoinTask<?> task) {
                ForkJoinTask<?>[] a; ForkJoinPool p;
                int b = base, s = top, n;
                if ((a = array) != null) {    // ignore if queue removed
                    int m = a.length - 1;     // fenced write for task visibility
                    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                    U.putOrderedInt(this, QTOP, s + 1);
                    if ((n = s - b) <= 1) {
                        if ((p = pool) != null)
                            p.signalWork(p.workQueues, this);
                    }
                    else if (n >= m)
                        growArray();
                }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2.ForkJoinTask的join方法主要作用时阻塞当前线程,并等待获取结果

    public final V join() {
            int s;
            if ((s = doJoin() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
    }
    private int doJoin() {
            int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
            return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                tryUnpush(this) && (s = doExec()) < 0 ? s :
                wt.pool.awaitJoin(w, this, 0L) :
                externalAwaitDone();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里我们就不详细的介绍源码了,内容太多了,我这里说下这段代码的大概意思。大致就是通过dojoin方法得到当前任务状态,然后根据任务状态,来判断返回什么结果。

    总结

    这种拆分的思想十分的常见,大数据中的mapreduce,采用的也是这种思想。都是将一个大的任务分为小的任务,然后计算每个小任务的结果,最后合并。因此,当我们在java中遇到一个十分的复杂的任务的时候,不妨考虑下使用Fork/Join框架。

  • 相关阅读:
    postman环境变量的设置
    顺应潮流,解放双手,让ChatGPT不废话直接帮忙编写可融入业务可运行的程序代码(Python3.10实现)
    Oracle中执行动态SQL
    若依微服务部署,裸服务部署、docker部署、k8s部署
    什么是RFP?什么又是CFP?两者的含金量该如何看?
    css怎样进行预处理
    RAID5同时挂2块磁盘,你见过吗?
    jsp+springmvc的校园失物招领管理平台
    预测股票涨跌看什么指标,如何预测明天股票走势
    2018架构真题&案例(四十九)
  • 原文地址:https://blog.csdn.net/qq_41820066/article/details/127871987