线程协作
上一篇中如何正确创建线程池,介绍了ThreadPoolExecutor的execute(Runnable task)提交任务的方法,下面来讲解获取执行任务的返回结果。
1. 如何获取任务的执行结果?
java通过ThreadPoolExecutor提供的3个submit()方法,和一个FutureTask工具类来获取任务执行结果。
1.1 三个submit()方法
// 1. 提交Runnable任务
Future<?> submit(Runnable task);
// 2.提交Callable任务
<T> Future<T> submit(Callable<T> task);
// 3.提交Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);
可以看出三个submit() 方法返回值都是Future接口,该接口有5个方法。
// 取消任务
boolean cancel( boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 阻塞式获得任务执行结果,如果线程任务还没执行结束,那么调用get()的线程会被阻塞,直到任务执行结束才会唤醒
get();
// 阻塞式获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
而三个提交方法的区别是参数不同,下面分别来说一下。
- 方法参数是Runnable,而Runnable接口的run()方法是没有返回值的,所以这个Future只能断言任务结束,类似Thread.join()。
- 而Callable接口的call() 方法是有返回值的,所以可以调用Future接口的get()方法获得任务执行结果。
- 最后第三种提交方式,参数为Runable和result,Runable本无返回值,但是调用它的Future接口的get()方法返回的是result结果。
result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
下面是其第三种提交方式的经典用法:Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建Result对象r
Result r = new Result();
r.setAAA(a);
// 提交任务
Future<Result> future = executor.submit(new Task(r), r);
Result fr = future.get();
// 下面等式成立
fr === r;
fr.getAAA() === a;
fr.getXXX() === x
class Task implements Runnable{
Result r;
//通过构造函数传入result
Task(Result r){
this.r = r;
}
void run() {
//可以操作result
a = r.getAAA();
r.setXXX(x);
}
}
1.2 submit()和execute()方法的区别?
- 参数不一样,submit()的第三种提交方式,可以传入result参数。
- submit()执行的线程可以返回结果Future,而execute()不能返回执行结果。
- 异常处理不一样,submit()方法,你可以对Future的get()方法try,catch处理异常,而execute执行的线程,如果任务执行期间发生异常就会导致线程终止,无法处理异常,所以只能在线程run方法中提前try,catch处理异常。
//submit方法处理异常
try {
fs.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
executorService.shutdownNow();
e.printStackTrace();
return;
}
//execute方法处理异常,只能在Runnable接口或其他的run方法中处理异常。
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("sleep后");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
1.3 FutureTask工具类
上面的Future是一个接口,而FutureTask真的是一个工具类,它有两个构造函数:
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);//和上面的一样。
其实FutureTask实现了Runnable接口和Future接口,一方面可以作为任务提交给线程执行,另一方面也可以获得任务执行结果,可以看下面的实例代码:
// 创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
2. 实现最优的"烧水泡茶"程序
最优的烧水泡茶过程应该是下图这样的:
并发编程核心问题:分工,同步,互斥,首先要做的就是分工,就是如何高效地拆解任务并分配给线程,下面这种方式最优,
当然你可以用前面java工具类中协作的工具类CountDownLatch,join()都可以,这里我们用Future来实现:
首先创建两个FutureTask——ft1和ft2,ft1完成洗水壶,烧开水,泡茶,ft2完成洗茶壶,洗茶杯,拿茶叶。当然在ft1的泡茶之前需要ft2执行结束。
// 创建任务T2的FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());
// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1:洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1:烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取T2线程的茶叶
String tf = ft2.get();//拿到ft2的执行结果,没执行结束,阻塞等待(只是拿ft2的执行结果,并不是让ft2任务才执行)
System.out.println("T1:拿到茶叶:"+tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
}
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2:洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2:洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2:拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return "龙井";
}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井