以下示例来自网络。
package com.wjxie.test.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
* 并行计算 1 到 n 之和。
*
* Created by wenjing.xie on 2016年4月20日
*/
public class Calculator extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 100;
private int start;
private int end;
public Calculator(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
Calculator left = new Calculator(start, middle);
Calculator right = new Calculator(middle + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 继承关系: RecursiveTask --> ForkJoinTask --> Future
Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));
System.out.println("当前活跃线程数: " + Thread.getAllStackTraces().size());
System.out.println(result.get());
}
}
package com.wjxie.test.forkjoin;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
/**
* 并行排序数组。
*
* Created by wenjing.xie on 2016年4月20日
*/
public class SortTask extends RecursiveAction {
private static final long serialVersionUID = 1L;
final long[] array;
final int start;
final int end;
private int THRESHOLD = 100;
public SortTask(long[] array) {
this.array = array;
this.start = 0;
this.end = array.length - 1;
}
public SortTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
protected void compute() {
if (end - start < THRESHOLD)
sequentiallySort(array, start, end);
else {
int pivot = partition(array, start, end);
new SortTask(array, start, pivot - 1).fork();
new SortTask(array, pivot + 1, end).fork();
}
}
/**
* 取数组 array[start-end] 的中间数的下标,并使得 array[pivot] 左边的元素比自己小,右边的比自己大。
*/
private int partition(long[] array, int start, int end) {
long x = array[end];
int i = start - 1;
for (int j = start; j < end; j++) {
if (array[j] <= x) {
i++;
swap(array, i, j);
}
}
swap(array, i + 1, end);
return i + 1;
}
private void swap(long[] array, int i, int j) {
if (i != j) {
long temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
private void sequentiallySort(long[] array, int lo, int hi) {
Arrays.sort(array, lo, hi + 1);
}
public static void main(String[] args) throws InterruptedException {
int SIZE = 10000;
ForkJoinPool forkJoinPool = new ForkJoinPool();
Random rnd = new Random();
long[] array = new long[SIZE];
for (int i = 0; i < SIZE; i++) {
array[i] = rnd.nextInt();
}
forkJoinPool.submit(new SortTask(array));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);
for (long item : array) {
System.out.println(item);
}
}
}