本文将会介绍Fork/Join框架的基本原理、设计思想、应用与实现等。
1 什么是Fork/Join框架
Fork/Join框架是Java 7提供的一种用于并行执行任务的框架,把大任务分解为若干小任务,最终通过合并每个小任务的结果得到大任务结果的框架。
Fork就是把一个大任务分解为若干个自认为并行的执行,Join就是最终合并这些子任务的执行结果,最后得到这个大任务的结果。eg:计算1+2+...+10000,可以分解为10个小任务,每个子任务求和1000个数字,最终汇总这10个子任务的结果,得到最终结果。
Fork/Join的运行流程图如下所示。
2 工作窃取算法
首先,我们需要来了解一下什么是工作窃取算法。工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于常见的一个大型任务,我们可以把这个大的任务切割成很多个小任务,然后这些小任务会放在不同的队列中,每一个队列都有一个相应的的工作执行线程来执行,当一个线程所需要执行的队列中,任务执行完之后,这个线程就会被闲置,为了提高线程的利用率,这些空闲的线程可以从其他的任务队列中窃取一些任务,来避免使自身资源浪费,这种在自己线程闲置,同时窃取其他任务队列中任务执行的算法,就是工作窃取算法。
在窃取任务线程和被窃取任务线程访问同一个队列时,为了避免出现线程之间的竞争,通常会使用双端队列这种结构,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从尾部拿任务线程。工作窃取算法的运行流程如图所示:
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列中只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。
3 Fork/Join框架的设计
在我们已经了解了Fork/Join框架的需求,如果让我们来设计一个Fork/Join框架,那么该如何设计呢?
- 分割任务。首先需要有一个fork类将大任务分割为小任务。但有可能小任务依然很大,需要进一步分割,直到分割出的子任务足够小。
- 执行子任务并合并结果。将分割得到的子任务放在多个双端队列中,然后启动线程分别从双端队列里获取任务执行。子任务执行完的结果均统一放在一个队列中,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join框架使用两个类来完成上述两件事情。
- ForkJoinTask:要使用ForkJoin框架,必须首先创阿金一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务。
2. FrokJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
任务分割出的子任务会添加到当前工作线程所维护的双端队列,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列尾部获取一个任务。
4 使用Fork/Join框架
通过一个简单的需求来使用Fork/Join框架,计算1+2+3+4的结果。因为有返回值,则需要继承RecursiveTask类。
首先要考虑的是如何分割任务,若希望每个子任务最多执行两个数的相加,那么设置分割的阈值为2,四个数字相加,就会把这个任务fork成两个任务。实现代码如下所示。
package chapter6;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 6362321471176096633L;
/**
* 阈值
*/
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
/*
* 结果值
*/
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if(canCompute) {
for(int i = start; i < end; i++) {
sum += i;
}
} else {
// 任务不够小,需要进行分割
int middle = (start + end)/2;
CountTask leftTask = new CountTask(start, middle);
CountTask rigthTask = new CountTask(middle+1, end);
// 执行子任务
leftTask.fork();
rigthTask.fork();
// 等待子任务执行完,得到其结果
int leftResult = leftTask.join();
int rightResult = rigthTask.join();
// 合并子任务
sum = rightResult + leftResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4
CountTask countTask = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(countTask);
try {
System.out.println(result.get());
} catch (Exception e) {
}
}
}
通过上述代码,我们可以大体了解到ForkJoinTask的使用方法。首先实现一个具体完成compute()方法的类,在类中指定阈值,完成具体的compute()函数的处理逻辑。在主函数中生成一个ForkJoinPool,用于提交任务。
ForkJoinTask与一般任务的主要区别在于他需要实现compute方法,在这个方法中,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,·看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果(个人认为是递归的思想)。使用join方法会等待子任务执行完并得到结果。
5 Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是无法在主线程中直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码。
if(countTask.isCompletedAbnormally()) {
System.out.println(countTask.getException());
}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或没有抛出异常,则返回null。
6 Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成。ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
- ForkJoinTask的fork方法实现原理
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。代码如下。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行工作。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this); // 唤醒或创建一个工作线程
}
else if (n >= m)
growArray();
}
}
- ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
首先,它调用了doJoin方法,通过doJoin方法得到当前任务的状态来判断是什么结果。任务状态有四种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16
首先调用reportException()方法来判断其任务是否为被取消状态或出现异常状态,若为这两种状态,则抛出异常。
否则调用getRawResult()方法直接返回任务结果。
本问简单介绍了Fork/Join框架的实现原理,但并未深入其底层代码实现进一步解释,如果有需要的同学,可以自己进一步研究其源代码,体会大师级的设计思路。