fork英文含义为叉子也可以理解为拆分,join英文含义为加入也可以理解为汇集,所以forkjoin可以理解为拆分任务然后将结果汇聚在一起,这种思想和大数据中的MapReduce很像(input --> split --> map --> reduce --> output),所以其可以大致分为两步:任务拆分和结果合并。
下面我将以一个demo来演示下forkjoin的基本特质。
package com.dongnaoedu.network.humm.多线程.ForkJoin;
import java.sql.Time;
import java.util.ArrayList;
import java.util.concurrent.*;
/**
* @author Heian
* @time 19/07/28 22:24
* @description: 理解forkjoin(线程池 +任务拆分 )的原理
*/
public class ForkJoinDemo {
//自定义的任务
static ArrayList<String> urls = new ArrayList<String>(){
{
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
}
};
//模拟网络请求,假设这里请求耗时为100毫秒
public static String doRequest(String url) throws InterruptedException{
TimeUnit.MILLISECONDS .sleep (100);
return "访问的网址:"+url + "\n";
}
//设置我们需要的任务,从多少到多少位一组算一个任务
static class TaskGroup implements Callable<String>{
private int startIndex;
private int endIndex;
public TaskGroup(int startIndex,int endIndex) {
this.startIndex = startIndex;
this.endIndex = endIndex;
}
@Override
public String call() throws Exception {
String sb = "";
for (int i = startIndex-1; i <=endIndex-1 ; i++) {
String s = doRequest (urls.get (i));
sb+=s;
}
return sb;
}
}
/**
* 拆分任务
* @param pageSize 按照多大为一组,拆分任务
*/
public static void splitTask(int pageSize) throws ExecutionException, InterruptedException {
int size = urls.size ();
int groupCount = size/pageSize + 1; // 9/10 = 0
System.out.println ("任务大小为:" + size + ",分成" + groupCount + "组来处理");
ExecutorService executorService = Executors.newFixedThreadPool (4);
ArrayList<Future<String>> list = new ArrayList<> ();
long startTime = System.currentTimeMillis ();
/*------------------任务分组逻辑-----------------------*/
//因为可能最后一组会存在零星的,所以要单独拿出来
for (int i = 1; i <= groupCount-1; i++) {
int startPageNum = (i-1)*pageSize + 1;//起始页码 = (第几组-1)*pageSize +1 从1开始
int endPageNum = pageSize*i; //截止页码 = 第几组*pageSize
System.out.println (startPageNum + ":" + endPageNum);
Future<String> future = executorService.submit (new TaskGroup (startPageNum, endPageNum));
list.add (future);
}
//零星 最后一组
int startPageNum = (groupCount-1)*pageSize + 1;//起始页码 = (第几组-1)*pageSize +1 从1开始
int endPageNum = size; //截止页码 = 第几组*pageSize
System.out.println (startPageNum + ":" + endPageNum);
Future<String> future = executorService.submit (new TaskGroup (startPageNum, endPageNum));
list.add (future);
for (Future<String> item : list){
//拿到每组任务的返回值(很长的网址拼接)
System.out.println(item.get());//阻塞 每个组的任务会在换行
}
System.out.println ("耗时为" + (System.currentTimeMillis ()-startTime) + "毫秒");
}
/**
* 我们要做的就是模拟,将我们要访问的网址,拆分成多组任务,后交给线程池处理
*/
public static void main(String[] args) throws Exception{
splitTask(10);
}
}
首先我这里定义了一个String类型的字符串(有65个),来模拟网络请求(每个请求耗时100ms),然后定义了一个线程池(4个线程)去请求这网址,所以这里可以拆分非10个一组,然后就是7组,然后将每组返回的结果合并返回给结果集。控制台打印结果如下:
任务大小为:65,分成7组来处理
1:10
11:20
21:30
31:40
41:50
51:60
61:65
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
访问的网址:http://www.sina.com
访问的网址:http://www.baidu.com
耗时为2024毫秒
而如果要用forkjoin线程框架去实现它又该怎么做呢?首先ForkJoinTask有两个子类
- RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
- RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)
你定义的任务类必须去实现ForkJoinTask的子类,继而实现它的compute()方法,此方法就是去实现具体的拆分任务的逻辑,可类比我上面的splitTask(int pageSize)方法,这里我将任务拆分为2等分,然后两等分在继续拆分,知道拆分的边界符合我的预期<=10,即可,图解和实示例代码如下。
package com.dongnaoedu.network.humm.多线程.ForkJoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author Heian
* @time 19/07/28 22:24
* @description: 理解forkjoin(线程池 +任务拆分 )的原理
*/
public class ForkJoinTest {
//自定义的任务
static ArrayList<String> urls = new ArrayList<String>(){
{
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
}
};
//模拟网络请求,假设这里请求耗时为100毫秒
public static String doRequest(String url,int index) throws InterruptedException{
TimeUnit.MILLISECONDS .sleep (100);
return index + "-访问的网址:"+url + "\n";
}
//本质是一个线程池,默认的线程数量:CPU的核数
static ForkJoinPool forkJoinPool = new ForkJoinPool (Runtime.getRuntime().availableProcessors(),//我是4核
ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false);
/**
* 现在将一个大任务分成多组任务,而分组的边界有自己设定,直到边界不能再细分
*/
static class Job extends RecursiveTask<String>{
private List<String> list;//需要拆分的任务
private int start;
private int end;
public Job(List<String> list, int start, int end) {
this.list = list;
this.start = start;
this.end = end;
}
//不断的拆分任务,直到任务数小于10才不拆分,
@Override
protected String compute() {
int taskSize = end - start;//得到任务的大小
if (taskSize<=10){
//先把任务拆分好了,在将各组任务执行
System.out.println ("小于10" + Thread.currentThread ().getName ());
String result = "";
for (int i = start; i <end ; i++) {
try {
result += doRequest (urls.get (i),i);
} catch (InterruptedException e) {
e.printStackTrace ();
}
}
return result;
}else {
//拆分任务
int x = (start + end)/2;//将任务分成两份 奇数:3--> 1,2
System.out.println (x+ Thread.currentThread ().getName ());
//起三个线程去分解这些任务
Job job1 = new Job (urls,start,x);
ForkJoinTask<String> fork = job1.fork ();
Job job2 = new Job (urls,x,end);
ForkJoinTask<String> fork1 = job2.fork ();
//固定写法 类似于语法
String result = "";
result += job1.join ();
result += job2.join ();
return result;
}
}
}
/**
* 我们要做的就是模拟,将我们要访问的网址,拆分成多组任务,后交给线程池处理
*/
public static void main(String[] args) throws Exception{
long statrTime = System.currentTimeMillis ();
Job job = new Job (urls,0,urls.size ());
ForkJoinTask<String> result = forkJoinPool.submit (job);
System.out.println (result.get ());
System.out.println ("耗时:" + (System.currentTimeMillis ()-statrTime));
}
}
控制台信息如下:
32ForkJoinPool-1-worker-1
16ForkJoinPool-1-worker-2
8ForkJoinPool-1-worker-2
小于10ForkJoinPool-1-worker-2
48ForkJoinPool-1-worker-3
40ForkJoinPool-1-worker-3
24ForkJoinPool-1-worker-1
小于10ForkJoinPool-1-worker-1
小于10ForkJoinPool-1-worker-3
小于10ForkJoinPool-1-worker-0
小于10ForkJoinPool-1-worker-3
小于10ForkJoinPool-1-worker-1
56ForkJoinPool-1-worker-0
小于10ForkJoinPool-1-worker-0
小于10ForkJoinPool-1-worker-0
0-访问的网址:http://www.baidu.com
1-访问的网址:http://www.sina.com
2-访问的网址:http://www.baidu.com
3-访问的网址:http://www.sina.com
4-访问的网址:http://www.baidu.com
5-访问的网址:http://www.sina.com
6-访问的网址:http://www.baidu.com
7-访问的网址:http://www.sina.com
8-访问的网址:http://www.baidu.com
9-访问的网址:http://www.sina.com
10-访问的网址:http://www.baidu.com
11-访问的网址:http://www.sina.com
12-访问的网址:http://www.baidu.com
13-访问的网址:http://www.sina.com
14-访问的网址:http://www.baidu.com
15-访问的网址:http://www.sina.com
16-访问的网址:http://www.baidu.com
17-访问的网址:http://www.sina.com
18-访问的网址:http://www.baidu.com
19-访问的网址:http://www.sina.com
20-访问的网址:http://www.baidu.com
21-访问的网址:http://www.sina.com
22-访问的网址:http://www.baidu.com
23-访问的网址:http://www.sina.com
24-访问的网址:http://www.baidu.com
25-访问的网址:http://www.sina.com
26-访问的网址:http://www.baidu.com
27-访问的网址:http://www.sina.com
28-访问的网址:http://www.baidu.com
29-访问的网址:http://www.sina.com
30-访问的网址:http://www.baidu.com
31-访问的网址:http://www.sina.com
32-访问的网址:http://www.baidu.com
33-访问的网址:http://www.sina.com
34-访问的网址:http://www.baidu.com
35-访问的网址:http://www.sina.com
36-访问的网址:http://www.baidu.com
37-访问的网址:http://www.sina.com
38-访问的网址:http://www.baidu.com
39-访问的网址:http://www.sina.com
40-访问的网址:http://www.baidu.com
41-访问的网址:http://www.sina.com
42-访问的网址:http://www.baidu.com
43-访问的网址:http://www.sina.com
44-访问的网址:http://www.baidu.com
45-访问的网址:http://www.sina.com
46-访问的网址:http://www.baidu.com
47-访问的网址:http://www.sina.com
48-访问的网址:http://www.baidu.com
49-访问的网址:http://www.sina.com
50-访问的网址:http://www.baidu.com
51-访问的网址:http://www.sina.com
52-访问的网址:http://www.baidu.com
53-访问的网址:http://www.sina.com
54-访问的网址:http://www.baidu.com
55-访问的网址:http://www.sina.com
56-访问的网址:http://www.baidu.com
57-访问的网址:http://www.sina.com
58-访问的网址:http://www.baidu.com
59-访问的网址:http://www.sina.com
60-访问的网址:http://www.baidu.com
61-访问的网址:http://www.sina.com
62-访问的网址:http://www.baidu.com
63-访问的网址:http://www.sina.com
64-访问的网址:http://www.baidu.com
耗时:2559
可以发现按道理线程池处理6500毫秒的任务,理想情况是6500/4=1625,但方法一返回的结果为2024,方法2返回的结果为2559,看出对于小任务,ForkJoin并不占优,毕竟要处理不断去拆分的逻辑,而且我这里是定时操作,也就很难看出任务窃取了,这里只做演示而已,只是要明白每个线程内部都有自己的队列,并且是双向队列FIFO( First in, First out)先进先出,LIFO(Last in, First out)后进先出。
备注:还需研究源码,这里暂时不做深入考究,他日有时间在深究。
- Worker线程用LIFO的方法取出任务,后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)
- 当任务队列为空,会随机从其他的worker的队列中以FIFO拿走一个任务执行(工作窃取:steal work)
- 如果一个Worker线程遇到了join操作,有子任务没有,会等到这个任务结束。否则直接返回
- 如果一个Worker线程遇到了join操作,有子任务没有,会等到这个任务结束。否则直接返回