java 多线程系列文章列表, 请查看目录: 《java 多线程学习笔记》
1. ForkJoinPool
java7 提供了ForkJoinPool 来支持将一个任务采用递归方式分成多个小任务计算, 然后再把小任务的计算结果合并为总的结果.
1.1 ForkJoinPool 的创建方式
- java7 提供了两个构造器:
- ForkJoinPool(int paralleism): 创建时指定并行线程数
- ForkJoinPool(): 以当前机器的cpu个数(Runtime.avaliableProcessors())作为并行参数
- java8 新增了静态方法创建通用池:
- ForkJoinPool.commomPool(): 返回一个通用池, 通用池状态不会受shutdown()和shutdownNow()方法影响.
- int getCommonPoolParallelism(): 返回通用池的并行线程数
1.2 子任务接口
ForkJoin 子任务有两种, 一种是没有返回值的, 一种是有返回值的. java 分别提供了对应的接口:
- RecursiveAction: 定义无返回值子任务应该实现的接口
- RecursiveTask: 定义有返回值子任务应该实现的接口
2. 无返回值ForkJoinPool 用法示例
2.1 创建子任务类
- 无返回值子任务需要集成RecursiveAction
- 需要指定拆分标准THRESHOLD, 明确开始计算的临界值
- 采用二分法进行子任务拆分
public class PrintTask extends RecursiveAction {
// 子任务拆分的标准
private static final int THRESHOLD = 3;
// 子任务处理的左临界值
private int start;
// 子任务处理的右临界值
private int end;
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 如果起始值和结束值之间的差小于THRESHOLD, 则开始打印
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "--" + i);
};
}else {
// 二分法拆分子任务
int middle = (start + end)/2;
// 创建递归线程任务
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并行执行递归任务
left.fork();
right.fork();
}
}
public static void main(String[] args) throws InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(new PrintTask(0, 30));
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
forkJoinPool.shutdown();
}
}
2.2 测试输出
ForkJoinPool-1-worker-2--13
ForkJoinPool-1-worker-2--14
ForkJoinPool-1-worker-2--11
ForkJoinPool-1-worker-2--12
ForkJoinPool-1-worker-2--9
ForkJoinPool-1-worker-2--10
ForkJoinPool-1-worker-2--7
ForkJoinPool-1-worker-2--8
ForkJoinPool-1-worker-2--20
ForkJoinPool-1-worker-2--21
ForkJoinPool-1-worker-2--18
ForkJoinPool-1-worker-2--19
ForkJoinPool-1-worker-2--16
ForkJoinPool-1-worker-2--17
ForkJoinPool-1-worker-2--15
ForkJoinPool-1-worker-2--24
ForkJoinPool-1-worker-2--25
ForkJoinPool-1-worker-2--22
ForkJoinPool-1-worker-2--23
ForkJoinPool-1-worker-2--26
ForkJoinPool-1-worker-2--27
ForkJoinPool-1-worker-2--1
ForkJoinPool-1-worker-2--2
ForkJoinPool-1-worker-2--0
ForkJoinPool-1-worker-2--3
ForkJoinPool-1-worker-2--4
ForkJoinPool-1-worker-1--28
ForkJoinPool-1-worker-1--29
ForkJoinPool-1-worker-3--5
ForkJoinPool-1-worker-3--6
2. 有返回值ForkJoinPool 用法示例
2.1 子任务
- 有返回值子任务需要继承类 RecursiveTask
- 需要指定拆分标准THRESHOLD, 明确开始计算的临界值
- 使用join()方法等待子线程开始完成
public class CalTask extends RecursiveTask<Integer> {
// 子任务拆分的标准
private static final int THRESHOLD = 10;
// 子任务处理的左临界值
private int start;
// 子任务处理的右临界值
private int end;
// 待计算数组
private List<Integer> array;
public CalTask(List<Integer> list, int start, int end) {
this.start = start;
this.end = end;
this.array = list;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果起始值和结束值之间的差小于THRESHOLD, 则开始打印
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
sum += array.get(i);
};
return sum;
}else {
// 二分法拆分子任务
int middle = (start + end)/2;
// 创建递归线程任务
CalTask left = new CalTask(array, start, middle);
CalTask right = new CalTask(array, middle, end);
// 并行执行递归任务
left.fork();
right.fork();
// 合并子任务计算结果
return left.join() + right.join();
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建数组
List<Integer> list = new ArrayList<>();
for (int i = 0; i <= 100; i++) {
list.add(i);
}
// 创建ForkJoinPool 线程池
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
// 提交多线程任务
ForkJoinTask<Integer> task = forkJoinPool.submit(new CalTask(list, 0, list.size()));
// 获取返回结果
Integer result = task.get();
System.out.println("1+2+3+...+" + list.get(list.size() -1) + "=" + result);
}
}
3.2 测试输出
1+2+3+...+100=5050