版权声明: https://blog.csdn.net/Dongguabai/article/details/85235005
Fork/Join 框架是 Java 7 提供了的一个用于并行执行任务的框架,采用类似于分治算法,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,直到最后子问题可以简单的直接求解,原问题的解即子问题的解的合并。
Fork/Join 的使用也很简单,一些 API 就跟线程池差不多。使用的时候要注意的是需要合理的“分而治之”。
Fork/Join 框架最常用的就是这四个类:
RecursiveAction 和 RecursiveTask 继承了 ForkJoinTask 抽象类:
RecursiveAction 是没有返回值的,RecursiveTask 是有返回值的。
看一个有返回值的小 Demo:
package test.demo2.deadlock;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;
/**
* 计算 Task
*
* @author Dongguabai
* @date 2018/12/24 16:55
*/
public class CalculatedForkJoinRecursiveTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
//拆分阈值
private int maxThreshold = 3;
public CalculatedForkJoinRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
public CalculatedForkJoinRecursiveTask(int start, int end, int maxThreshold) {
this.maxThreshold = maxThreshold;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - start) <= maxThreshold) {
return IntStream.rangeClosed(start, end).sum();
}
int middle = (start + end) / 2;
CalculatedForkJoinRecursiveTask left = new CalculatedForkJoinRecursiveTask(start, middle);
CalculatedForkJoinRecursiveTask right = new CalculatedForkJoinRecursiveTask(middle + 1, end);
left.fork();
right.fork();
return left.join() + right.join();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculatedForkJoinRecursiveTask(0, 2));
System.out.println(future.get());
}
}
使用 RecursiveAction:
package test.demo2.deadlock;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/**
* 计算 Task
*
* @author Dongguabai
* @date 2018/12/24 16:55
*/
public class CalculatedForkJoinRecursiveAction extends RecursiveAction {
//计算结果
private static AtomicInteger RESULT = new AtomicInteger();
private final int start;
private final int end;
//拆分阈值
private int maxThreshold = 3;
public CalculatedForkJoinRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(new CalculatedForkJoinRecursiveAction(0, 10));
forkJoinPool.awaitTermination(1, TimeUnit.SECONDS);
System.out.println(RESULT);
}
@Override
protected void compute() {
if ((end - start) <= maxThreshold) {
RESULT.addAndGet(IntStream.rangeClosed(start, end).sum());
} else {
int middle = (start + end) / 2;
CalculatedForkJoinRecursiveAction left = new CalculatedForkJoinRecursiveAction(start, middle);
CalculatedForkJoinRecursiveAction right = new CalculatedForkJoinRecursiveAction(middle + 1, end);
left.fork();
right.fork();
}
}
}
参考资料: