并发编程(五)Fork/Join
Frok/Join模型主要为了处理分治任务。
什么事分治思想?
其实就是分而治之,将大的任务分解成无数个小的任务,直到子问题可以直接求出结果为止。
分治思想的条件就是大任务和小任务除了数据量不一样之外,其他的都是一摸一样的。
分治思想的问题,基本上都会用递归来实现。
1.1 java并发包中的并行计算框架 Fork/Join
分治任务模型主要分为分解和合并。对应的就是Fork/Join中的Fork和Join。Fork负责分解任务,Join负责合并任务。
分治任务也是多线程的操作,也有着自己的线程池,就是ForkJoinPool,他的默认线程数是CPU的核数。
和ThreadPoolExecutor一样ForkJoinPool也是一个生产者消费者模型实现的的线程池。
不一样的是ForkJoinPool中有多个任务队列,而且在一个任务队列空的情况下,还可以进行任务窃取来获取其他任务队列的任务,因为ForkJoinPool的任务队列都是双向队列,正常的获取任务和任务窃取分别是从任务队列的不同端进行的操作,不会造成没有必要的数据竞争,这样以来所有的工作线程都不会停止下来了。
ForkJoinPool使用java官方事例代码:
public static void main(String[] args){
// parallelism并行度
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(30);
Integer invoke = forkJoinPool.invoke(fibonacci);
System.out.println(invoke);
}
// 递归任务 RecursiveTask RecursiveAction都是ForkJoinTask的子类 前者有返回值后者没有返回值
static class Fibonacci extends
RecursiveTask<Integer> {
final int n;
Fibonacci(int n){this.n = n;}
@Override
protected Integer compute(){
if(n <= 1){
return n;
}
Fibonacci fibonacci = new Fibonacci(n-1);
//异步执行子任务
fibonacci.fork();
Fibonacci fibonacci2 = new Fibonacci(n-2);
//fibonacci.join()阻塞当前线程等待子任务的执行结果
return fibonacci2.compute() + fibonacci.join();
}
}
1.2 利用Fork/Join实现单机版的MapReduce
package com.zy.forkjoin;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
*统计文件中单词的出现数量
* @author: zy
* @Date: 2019-06-27 10:19
* @Copyright: 2019 www.lenovo.com Inc. All rights reserved.
*/
public class MapReduceDemo {
public static void main(String[] args){
//模拟文件
String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
// 创建 ForkJoin 线程池
ForkJoinPool fjp =
new ForkJoinPool(3);
// 创建任务
MR mr = new MR(
fc, 0, fc.length);
// 启动任务
Map<String, Long> result =
fjp.invoke(mr);
// 输出结果
result.forEach((k, v)->
System.out.println(k+":"+v));
}
/**
* 模拟类
*/
static class MR extends
RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;
/**
* 构造函数
* @param fc
* @param fr
* @param to
*/
MR(String[] fc, int fr, int to){
this.fc = fc;
this.start = fr;
this.end = to;
}
@Override protected
Map<String, Long> compute(){
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start+end)/2;
MR mr1 = new MR(
fc, start, mid);
mr1.fork();
MR mr2 = new MR(
fc, mid, end);
// 计算子任务,并返回合并的结果
return merge(mr2.compute(),
mr1.join());
}
}
/**
* 合并结果
* @param r1
* @param r2
* @return
*/
private Map<String, Long> merge(
Map<String, Long> r1,
Map<String, Long> r2) {
Map<String, Long> result =
new HashMap<>();
result.putAll(r1);
// 合并结果
r2.forEach((k, v) -> {
Long c = result.get(k);
if (c != null) {
result.put(k, c + v);
}
else {
result.put(k, v);
}
});
return result;
}
/**
* 统计单词数量
* @param line
* @return
*/
private Map<String, Long>
calc(String line) {
Map<String, Long> result =
new HashMap<>();
// 分割单词
String [] words =
line.split("\\s+");
// 统计单词数量
for (String w : words) {
Long v = result.get(w);
if (v != null) {
result.put(w, v + 1);
}
else{
result.put(w, 1L);
}
}
return result;
}
}
}