Fork/Join框架是java7提供的一个用于执行并行任务的框架,是一个把大部分任务分割成若干个小任务,最终汇总每个小任务结果后,得到大任务结果的框架。
工作窃取算法是指某个线程从其他队列里窃取任务来执行。为什么要使用这个工作窃取算法呢。首先Fork/Join框架是把一个大任务分为了好多个子任务,然后把这些子任务,交给不同的线程去执行。这个时候就会有一个问题。不同的线程,执行的速度肯定是不一样的,有的线程执行的快,有的线程执行的慢,这个就会导致有的线程任务已经执行完了,但是有的线程,还会有很多的任务。但是执行完任务的线程,也不能空着啊,不然就是浪费效率。所以,这个时候就会用到工作窃取算法,做完任务的线程,会去没做完任务的线程那里,窃取一些线程,来进行工作。如下图,是工作窃取算法的运行流程图。
线程1为做完任务的线程,它窃取线程2的线程的时候,是从尾部窃取的。线程2 是被窃取的线程,它是从头部取线程的。
工作窃取算法的缺点:在某些情况下还是存在竞争的,比如在双端队列里只有一个任务的时候。并且该线程消耗了更多的系统资源,比如创建了多个线程和多个双端队列。
Fork/Join框架的设计总体大致分为两个步骤
1.分割任务。首先需要一个fork类把大任务分割为子任务,有可能子任务还是很大,所以需要不停的分割,直到分割出的子任务足够小。
2.执行任务并合并结果: 分割的子任务放在双端队列里面,然后创建多线程去双端队列里面拿任务执行。子任务的结果统一放在一个队列里面,启动一个线程从队列里面拿数据,然后合并这些数据。
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法捕获异常。
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成。ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
1.ForkJoinTask的fork方法实现原理
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的push方法异步的执行这个任务(就是把任务都放到队列里面),然后立即返回结果。代码如下:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
push方法会把当前任务存放在ForkJoinTask数组队列里。然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
2.ForkJoinTask的join方法主要作用时阻塞当前线程,并等待获取结果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
这里我们就不详细的介绍源码了,内容太多了,我这里说下这段代码的大概意思。大致就是通过dojoin方法得到当前任务状态,然后根据任务状态,来判断返回什么结果。
这种拆分的思想十分的常见,大数据中的mapreduce,采用的也是这种思想。都是将一个大的任务分为小的任务,然后计算每个小任务的结果,最后合并。因此,当我们在java中遇到一个十分的复杂的任务的时候,不妨考虑下使用Fork/Join框架。