1-Future简介
1.1 简介
- Future 就是一个接口, 用于异步获取任务回调结果,这样中途可以做其他事,等到要取出结果再去取出, 它代表的是一种未来能获取到结果的对象
- 本质上是一种设计模式, 如果需要异步获得任务执行结果可以基于Future进行设计,比如RPC调用
1.2-Future源码
public interface Future<V> {
// 取消任务执行
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否取消
boolean isCancelled();
// 任务是否完成
boolean isDone();
//阻塞等待结果返回
V get() throws InterruptedException, ExecutionException;
//超时阻塞等待结果返回
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1.3-Future 基本使用
1.3.1 基于线程池
- 线程池提交的Callable类型任务返回的结果就是一个Future,我们可以通过Future的get方法异步获取到任务的执行结果.
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 泛型指定返回的结果类型
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("开始执行任务");
Thread.sleep(3000);
System.out.println("结束任务");
return "[1,2,3,4]";
}
});
System.out.println("我先去上班,下班回来再获取任务结果");
Thread.sleep(5000);
System.out.println("下班了开始获取任务结果: " + Thread.currentThread().getName());
String result = future.get(); // 阻塞等待任务完成
System.out.println("下班了获取任务结果为: "+result + " "+Thread.currentThread().getName());
输出:
我先去上班,下班回来再获取任务结果
开始执行任务
结束任务
下班了开始获取任务结果: main
下班了获取任务结果为: [1,2,3,4] main
1.3.2 基于Thread
// FutureTask 本质上也是Future,因为它实现了 Runnable 和 Future 接口
// 将Callable任务包装成Future
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("开始任务");
Thread.sleep(3000);
System.out.println("结束任务");
return "[1,2,3,4]";
}
});
new Thread(task).start();
System.out.println("我先去上班,下班回来再获取任务结果");
System.out.println("开始获取任务结果");
String res = task.get(); // 阻塞等待任务完成
System.out.println("获取任务结果完成");
System.out.println(res);
输出:
开始获取任务结果
开始任务
结束任务
获取任务结果完成
[1,2,3,4]
1.4- 简单手写实现Future的功能
1.4.1 实现自定义Future接口
public class MyFuture<V> implements Future<V> {
private volatile boolean isEnd;
private V result;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isEnd;
}
// 当任务还未完成,释放对象锁并进入等待队列
@Override
public synchronized V get() throws InterruptedException, ExecutionException {
while (!isDone()){
this.wait();
}
return result;
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
// 当任务完成后回调, 更新任务状态并唤醒此对象锁上的所有阻塞线程
public synchronized void finishCompletion(V result) {
if(isDone())
return;
this.result = result;
this.isEnd = true;
notifyAll();
}
}
1.4.2 执行任务返回Future对象
public class FutureExecutors {
private FutureExecutors(){
}
// 异步执行有返回结果的任务并返回MyFuture对象,当异步执行完成通过finishCompletion回调
public static <T> MyFuture<T> excuteTask(MyTask<T> task){
MyFuture<T> f = new MyFuture<>();
new Thread(()->{
T res = task.run();
f.finishCompletion(res);
}).start();
return f;
}
public interface MyTask<T> {
T run();
}
}
1.4.3 测试
MyFuture<List<String>> myFuture = FutureExecutors.excuteTask(() -> {
System.out.println("开始执行任务");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束任务");
return Arrays.asList("1", "2", "3");
});
System.out.println("开始获取任务结果: " + Thread.currentThread().getName());
List<String> list = myFuture.get();
System.out.println("结束获取任务结果: "+list + " "+Thread.currentThread().getName());
输出:
开始获取任务结果: main
开始执行任务
结束任务
结束获取任务结果: [1, 2, 3] main
2、CompletableFuture 异步任务编排
2.1 简介
- 就是组合去使用多种异步任务进行协调编排,比如任务A执行后才执行任务B, 或者任务A,B,C都执行完后才回调
- 类似与Vue的Promise
2.2 runAsync方法
ExecutorService threadPool = Executors.newFixedThreadPool(10);
System.out.println("1");
CompletableFuture.runAsync(()->{
System.out.println("开始 [ "+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束 [ "+Thread.currentThread().getName());
},threadPool);
System.out.println("2");
输出:
- 其实就是普通异步任务执行而已
1
2
开始 [ pool-1-thread-1
结束 [ pool-1-thread-1
2.3 supplyAsync方法
- 与runAsync不同时, supplyAsync的异步任务的执行有返回值,通过future的get方法即可返回
System.out.println("1");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始 [ " + Thread.currentThread().getName());
/* try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("结束 [ " + Thread.currentThread().getName());
return "[1,2,3,4]";
}, threadPool);
Thread.sleep(2000);
System.out.println("2");
String s = future.get(); // 阻塞等待异步任务的结果返回
System.out.println(s);
System.out.println("3");
输出:
1
开始 [ pool-1-thread-1
2
结束 [ pool-1-thread-1
[1,2,3,4]
3
2.4 whenComplete和whenCompleteAsync方法
- 异步任务执行后触发的回调,并且会把上一级任务执行的结果和异常信息回调
whenComplete/whenCompleteAsync区别:
- 以后凡是方法不以 Async结尾, 就是异步任务间使用相同的线程执行,而 带Async结尾的方法可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//int i = 10 / 0;
System.out.println("结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool).whenComplete((result,exception)-> {
System.out.println("异步任务结束触发回调 [ " + Thread.currentThread().getName());
System.out.println("上一级任务结果为: " + result);
System.out.println("上一级任务异常为: " + exception);
}).exceptionally((exception) -> {
return "SB失败";});
String s = future.get(); // 阻塞等待异步任务结果返回
System.out.println(s);
输出:
开始 [ pool-1-thread-1
结束 [ pool-1-thread-1
异步任务结束触发回调 [ pool-1-thread-1
上一级任务结果为: Sb
上一级任务异常为: null
Sb
2.5 handler和handleAsync方法
- 与whenComplete区别就是, whenComplete最终执行结果为null无法返回, 而handler可以将执行结果最终进行返回
public void test04() throws ExecutionException, InterruptedException {
System.out.println("1");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool).handleAsync((result,exception)->{
System.out.println("异步任务结束触发回调 [ " + Thread.currentThread().getName());
return result +" is you !"; //将任务结果重新计算返回
});
String s = future.get();
System.out.println(s);
System.out.println("3");
}
输出:
开始 [ pool-1-thread-1
结束 [ pool-1-thread-1
异步任务结束触发回调 [ ForkJoinPool.commonPool-worker-1
Sb is you !
2.6 线程串行化
- 就是将多个异步任务串行化执行
/**
thenRun 任务A和任务B之间没关系, 最终不返回任务结果 (即future.get()没有返回值)
thenAcceptAsync 任务B可以拿到任务A的结果,最终不返回任务结果
thenApply 任务B可以拿到任务A的结果, 最终返回任务结果
*/
@org.junit.Test
public void test05() throws ExecutionException, InterruptedException{
System.out.println("1");
// 注意: 最后执行完future.get()没有返回值
/* CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool).thenRun(() -> {
System.out.println("任务B开始结束 [ " + Thread.currentThread().getName());
});
*/
/* CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool).thenAcceptAsync((result) -> {
System.out.println("任务B开始 [ " + Thread.currentThread().getName());
System.out.println("任务A的结果为:" + result);
System.out.println("任务B结束 [ " + Thread.currentThread().getName());
});*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool).thenApply((result) -> {
System.out.println("任务B开始 [ " + Thread.currentThread().getName());
System.out.println("任务A的结果为:" + result);
System.out.println("任务B结束 [ " + Thread.currentThread().getName());
return result + " is you ? ";
});
System.out.println(future.get());
System.out.println("3");
}
2.7 组合任务(都要完成)
- 组合任务都完成后可触发自定义任务
/**
* 6- 任务组合 (都要完成)
* 组合任务都完成后触发自定义任务
* runAfterBoth 组合两个future、最终不反悔future的结果
* thenCombine 可以获取到两个任务future的结果,有返回值
* thenAcceptBoth 可以获取到两个任务future的结果,无返回值
*
*/
@org.junit.Test
public void test06() throws ExecutionException, InterruptedException {
// 创建异步任务A和B
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A结束 [ " + Thread.currentThread().getName());
return 520;
}, threadPool);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
System.out.println("任务B开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务B结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool);
// 1-runAfterBothAsync
/* futureA.runAfterBothAsync(futureB,() -> {
System.out.println("任务AB都完成了 [ " + Thread.currentThread().getName());
},threadPool);*/
// 2-thenAcceptBothAsync -回调可拿到任务AB的结果
/* futureA.thenAcceptBothAsync(futureB,(resultA,resultB)->{
System.out.println("任务AB都完成了 [ " + Thread.currentThread().getName());
System.out.println("任务A结果为: "+resultA+"、任务B结果为: "+ resultB);
},threadPool);*/
/* futureA.thenAcceptBoth(futureB,(resultA,resultB)->{
// 示例不带Async 用的是 A 或者 B 的线程
System.out.println("任务AB都完成了 [ " + Thread.currentThread().getName());
System.out.println("任务A结果为: "+resultA+"、任务B结果为: "+ resultB);
});*/
// 3-thenCombineAsync -最终异步任务有返回结果
CompletableFuture<String> future = futureA.thenCombineAsync(futureB, (resultA, resultB) -> {
System.out.println("任务AB都完成了 [ " + Thread.currentThread().getName());
System.out.println("任务A结果为: " + resultA + "、任务B结果为: " + resultB);
return resultA + "------>" + resultB;
}, threadPool);
System.out.println(future.get());
}
2.7 组合任务II(一个完成即可触发)
- 组合任务只要有一个完成后可触发自定义任务
/**
* runAfterEither 无返回值, 无法获得上级任务的结果
* acceptEitherAsync 无返回值, 可以获得上级任务的结果
* applyToEitherAsync 有返回值, 可以获得上级任务的结果
*/
@org.junit.Test
public void test07() throws ExecutionException, InterruptedException {
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A结束 [ " + Thread.currentThread().getName());
return "A---->";
}, threadPool);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
System.out.println("任务B开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务B结束 [ " + Thread.currentThread().getName());
return "Sb";
}, threadPool);
/* CompletableFuture<Void> future = futureA.runAfterEither(futureB, () -> {
System.out.println("任务A或者B完成了 [ " + Thread.currentThread().getName());
});*/
/* CompletableFuture<Void> future = futureA.acceptEitherAsync(futureB, (result) -> {
System.out.println("任务A或者B完成了 [ " + Thread.currentThread().getName());
System.out.println("任务A或者B的结果为: [ " + result);
});*/
CompletableFuture<Object> future = futureA.applyToEitherAsync(futureB, (result) -> {
System.out.println("任务A或者B完成了 [ " + Thread.currentThread().getName());
System.out.println("任务A或者B的结果为: " + result);
return result + "******";
}, threadPool);
System.out.println(future.get());
}
2.8 多组合任务
- all方法 – future.get方法等待所有任务都执行完才返回(类似Promise.all)
- anyOf 方法 – future.get方法等待所有任务只要有一个执行完才返回
如果每个异步任务的结果都通过future.get方法去同步收集结果,将会大大耗费时间,
//比如任务A返回要3秒,任务B返回要2秒, 任务C返回要1秒,
futureA.get(); //3秒
futureB.get(); // 2秒
futureC.get(); // 1秒
// 最终耗时将是6秒
但是通过anyOf方法同时异步执行最终可能只需要3秒既可以返回所有数据
final static int TASK_COUNT = 5;
@org.junit.Test
public void test08() throws ExecutionException, InterruptedException {
// 初始化5个异步任务
CompletableFuture<String>[] taskArr = new CompletableFuture[TASK_COUNT];
for (int i = 0; i < TASK_COUNT; i++) {
int a = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务"+a+"开始 [ " + Thread.currentThread().getName());
try {
Thread.sleep(new Random().nextInt(5)*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务"+a+"结束 [ " + Thread.currentThread().getName());
return "任务"+a;
});
taskArr[i] = future;
}
// 1 - allOf -
// CompletableFuture<Void> allOf = CompletableFuture.allOf(taskArr);
// Void result = allOf.get(); // 阻塞等待所有任务结束,返回值为空
// 2- anyOf
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(taskArr);
Object result = anyOf.get(); // 阻塞等待任意一个任务结束
System.out.println("结果为: "+result); // 返回那一个先结束任务的结果
// 依次获得其他任务结果
for (int i = 0; i < TASK_COUNT; i++) {
System.out.println("任务"+i+"的结果为: "+taskArr[i].get());
}
}
2.7 总结
- 不带Async方法异步任务间使用相同线程
- 带额外的Executor参数的方法使用的是指定线程池执行任务,不带额外的Executor参数的方法就是用自己的线程执行
- run开头方法没有最终异步任务返回值, 也无法获取到上级任务结果
- acept开头方法没有最终异步任务返回值, 但是获取到上级任务结果
- apply开头方法 有最终异步任务返回值, 也获取到上级任务结果