JDK1.8新加CompletableFuture,实现了Future<T>, CompletionStage<T>两个接口。
CompletableFuture vs ListenableFuture
jdk8提供对future的升级,会优于Guava的ListenableFuture
Futrue |
FutureTask |
CompletionService |
CompletableFuture |
|
原理 |
Futrue接口 |
接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future<V>+Runnable: |
内部通过阻塞队列+FutureTask接口 |
JDK8实现了Future<T>, CompletionStage<T>2个接口 |
多任务并发执行 |
支持 |
支持 |
支持 |
支持 |
获取任务结果的顺序 |
支持任务完成先后顺序 |
未知 |
支持任务完成的先后顺序 |
支持任务完成的先后顺序 |
异常捕捉 |
自己捕捉 |
自己捕捉 |
自己捕捉 |
源生API支持,返回每个任务的异常 |
建议 |
CPU高速轮询,耗资源,可以使用,但不推荐 |
功能不对口,并发任务这一块多套一层,不推荐使用。 |
推荐使用,没有JDK8CompletableFuture之前最好的方案,没有质疑 |
API极端丰富,配合流式编程,速度飞起,推荐使用! |
首先说明一下已Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行;
非Async基本都是同步执行
//CompletableFuture代码
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
CompletableFuture常常使用类似的函数式接口
/**
* Consumer<T> 消费型接口 消费对象
* void accept(T t);
* Supplier<T> 供给型接口 生成对象
* T get();
* Function<R,T> 函数型接口 指定特定功能
* R apply(T t);
* Predicate<T> 断言型接口 进行条件判断
* boolean test(T t);
* */
简单使用入门
执行任务、执行后续任务、获取任务结果;
runAsync/supplyAsync 无返回/有返回 Supplier
thenRun Runnable 执行完成后执行
whenComplete BiConsumer 结束后执行
/**
* run/supply 无返回值/有返回值
* public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
Supplier 是什么
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
future.whenComplete
**/
public static void testSimpleUsage() {
log.info("线程{}, 开始简单应用测试", Thread.currentThread().getName());
// case1: supplyAsync
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("线程{},执行有返回值任务", Thread.currentThread().getName());
return "返回结果:成功";
});
// thenRun,与supplyAsync同线程
future.thenRun(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("线程{},上一个任务执行完成,执行下一个任务①完成", Thread.currentThread().getName());
});
//thenRunAsync,另启动线程执行
future.thenRunAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("线程{},上一个任务执行完成,执行下一个任务②完成", Thread.currentThread().getName());
});
// 主动触发Complete结束方法
// future.complete("Manual complete value.");
future.whenComplete((v, e) -> {
log.info("线程{},执行结果: {}", Thread.currentThread().getName(), v);
log.info("线程{}, 执行异常信息: {}", Thread.currentThread().getName(), e);
});
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("线程{} ,简单应用测试结束了", Thread.currentThread().getName());
}
thenApply/thenApplyAsync 变换结果 Function
thenCombine/thenCombineAsync 聚合结果 CompletionStage BiFunction
thenCompose/thenComposeAsync 传递结果
/**
* 变换结果 这些方法的输入是上一个阶段计算后的结果,返回值是经过转化后结果
* public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
聚合结果
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
传递结果 同步异步
thenCompose():第一个CompletableFuture执行完毕后,传递给下一个CompletionStage作为入参进行操作。
thenComposeAsync()
**/
public static void testTransformResult() {
//转换结果
String result = CompletableFuture.supplyAsync(() -> "Hello ").thenApplyAsync(v -> v + "world").join();
log.info("异步转换结果是 {}", result);
//聚合结果
CompletableFuture<String> futureTaskA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureTaskB = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> combineResult = futureTaskA.thenCombine(futureTaskB, (x, y) -> x + "-" + y);
try {
log.info("合并结果为{}", combineResult.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//传递结果 结果由上一个CompletableFuture的结果组成
CompletableFuture<String> futureTaskAA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> completableFuture = futureTaskAA.thenCompose(string -> CompletableFuture.supplyAsync(() -> string + " world"));
try {
log.info("结果传递最终值为:{}", completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
其他Api方法列表
// 任意一个返回,就返回; 多个CompletableFuture
public static void testAnyOf(){
CompletableFuture<String> futureTaskA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureTaskB = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(futureTaskA, futureTaskB);
try {
System.out.println(objectCompletableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//多个CompletableFuture 都执行完成才返回
public static void testAllOf(){
CompletableFuture<String> futureTaskA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureTaskB = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(futureTaskA, futureTaskB);
voidCompletableFuture.whenComplete( (t, u) -> System.out.println("任务都完成了 "));
}
//thenAccept/thenAcceptAsync 对结果进行处理 参是Consumer 有入参无返回值;
//对比thenCompose 是无返回值,只对结果进行消费
public void testThenAccept(){
CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world"));
}
//thenAcceptBoth 对两个结果进行消费
public void testthenAcceptBoth() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> System.out.println(s1 + " " + s2));
}
// runAfterBoth 只关心这两个CompletionStage执行完毕,之后在进行操作 无返回值
// 对比 thenRunAsync;单个和double
public void testRunAfterBoth(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
}
// 谁计算的快,我就用那个CompletionStage的结果进行下一步的转化操作。
//applyToEither/applyToEitherAsync
public void testApplyToEither() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), s -> "组合" + s).join();
System.out.println(result);
}
//谁计算的快,我就用那个CompletionStage的结果进行下一步的消耗操作。
//acceptEither/acceptEitherAsync
public void testAcceptEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).acceptEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), s -> System.out.println(s));
}
//两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)。
//runAfterEither/runAfterEitherAsync
public void testRunAfterEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
}
//当运行时出现了异常,可以通过exceptionally进行补偿
//exceptionally
public void testExceptionally() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("测试一下异常情况");
}
return "s1";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
/**
结果 这里也可以看出,如果使用了exceptionally,就会对最终的结果产生影响,它没有口子返回如果没有异常时的正确的值
null
java.lang.RuntimeException: 测试一下异常情况
java.lang.RuntimeException: 测试一下异常情况
hello world
**/
public void testWhenComplete() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("测试一下异常情况");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println(s);
System.out.println(t.getMessage());
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
// handle 异常已返回handle的值,正常会返回正常值
// 结果 hello world
public void testHandle1() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//出现异常
if (1 == 1) {
throw new RuntimeException("测试一下异常情况");
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
//结果 s1
public void testHandle2() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//出现异常
if (1 == 1) {
throw new RuntimeException("测试一下异常情况");
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}