Executor
Executor用于已知任务数的情况,将任务数提交给线程池,线程池自行分配管理线程。
注意:Callable和Runnable是等价的,但Runnable的run方法没有返回值,而Callable的call方法可以有返回值。
示例:
- SumTask类
import java.util.Random;
import java.util.concurrent.Callable;
//任务实现Callable接口
public class SumTask implements Callable<Integer> {
//定义每个线程计算的区间
private int startNumber;
private int endNumber;
public SumTask(int startNumber, int endNumber){
this.startNumber=startNumber;
this.endNumber=endNumber;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i=startNumber; i<=endNumber; i++)
{
sum = sum + i;
}
Thread.sleep(new Random().nextInt(1000));
System.out.printf("%s: %d\n",Thread.currentThread().getName(),sum);
return sum;//返回局部总和
}
}
- SumTest
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class SumTest {
public static void main(String[] args) {
//执行线程池,线程的个数一般为cpu的两倍左右,不能太多ִ
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(4);
List<Future<Integer>> resultList=new ArrayList<>();
//统计1-1000总和,分成十个任务提交
for (int i=0; i<10; i++){
SumTask calculator=new SumTask(i*100+1, (i+1)*100);
Future<Integer> result=executor.submit(calculator);//通过Future能获取线程返回值
resultList.add(result);
}
//每隔50ms,轮询等待10个任务结束
do {
System.out.printf("Main:已经完成多少个任务:%d\n",executor.getCompletedTaskCount());
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
System.out.printf("Main: Task %d: %s\n",i,result.isDone());//获取当前任务是否完成
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (executor.getCompletedTaskCount()<resultList.size());
// 所有任务都结束
int total = 0;
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
Integer sum=null;
try {
sum=result.get();//调用get()方法获取函数返回值
total = total + sum;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.printf("1-1000总和:" + total);
// 关闭线程池
executor.shutdown();
}
}
总结:
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(4);
创建线程池executor.submit(Task);
向线程池提交任务executor.getCompletedTaskCount();
获得完成任务数Future.isdone();
通过Future获取当前任务是否完成Future.get();
通过Future获取任务返回值executor.shutdown();
关闭线程池executor.getPoolSize();
获取线程池大小
Fork-join
分治编程,适用于任务量不好确定的场合。
示例:
- SumTask
import java.math.BigInteger;
import java.util.concurrent.RecursiveTask;
//分任务求和extends RecursiveTask<Long>
public class SumTask extends RecursiveTask<Long> {
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
public static final int threadhold = 5;//阈值
@Override
protected Long compute() {
Long sum = 0L;
// 如果任务足够小就直接执行
boolean canCompute = (end - start) <= threadhold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum = sum + i;
}
} else {
// 任务大于阈值,分裂为2个任务
int middle = (start + end) / 2;
SumTask subTask1 = new SumTask(start, middle);
SumTask subTask2 = new SumTask(middle + 1, end);
invokeAll(subTask1, subTask2);
Long sum1 = subTask1.join();
Long sum2 = subTask2.join();
// 结果合并
sum = sum1 + sum2;
}
return sum;
}
}
- SumTest
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
//分任务求和
public class SumTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建执行线程池
ForkJoinPool pool = new ForkJoinPool();
//ForkJoinPool pool = new ForkJoinPool(4);
//创建任务
SumTask task = new SumTask(1, 10000000);
//提交任务
ForkJoinTask<Long> result = pool.submit(task);
//等待结果
do {
System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
System.out.printf("Main: Paralelism: %d\n",pool.getParallelism());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
//输出结果
System.out.println(result.get().toString());
}
}
总结:
ForkJoinPool pool = new ForkJoinPool();
创建执行线程池SumTask task = new SumTask(1, 10000000);
创建任务ForkJoinTask<Long> result = pool.submit(task);
提交任务result.get()
获得计算结果也即重写的compute函数返回值task.isDone()
检测任务是否完成pool.getActiveThreadCount()
获得整个线程池里的线程数pool.getParallelism()
获得线程池里的并行度(有多少个线程同时在工作
数据结构的使用
因此需要有一些适用于并发的数据结构: