文章目录
- 一.CompletionService
- 二.CompletableFuture
-
- 1.为什么使用CompletableFuture
- 2.创建异步任务
- 3.异步回调
- 4.组合处理
-
- 4.1.thenAcceptBoth - 2个任务同时完成后的结果消费
- 4.2.thenCombin-2个任务同时完成后的结果合并
- 4.3.runAfterBoth-2个任务同时完成时的回调
- 4.4.applyToEither-2个任务任意一个完成后的结果处理
- 4.5.acceptEither-2个任务任意一个完成后的结果消费
- 4.6.runAfterEither-2个任务任意一个完成后的回调
- 4.7.thenCompose-任务串行
- 4.8.completeExceptionally-异常处理
- 4.9.allOf和anyOf
- 4.10.场景1:多个异步任务串行执行
- 4.11.场景2: 多个异步任务并行执行
补.常用多线程并发获取返回结果方法汇总
描述 | Future | FutureTask | CompletionService | CompletableFuture |
---|---|---|---|---|
原理 | Future接口 | 接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future+Runnable | 内部通过阻塞队列+FutureTask接口 | Java8 实现了Future, CompletionStage两个接口 |
多任务并发执行 | 支持 | 支持 | 支持 | 支持 |
获取任务结果的顺序 | 按照提交顺序获取结果 | 未知 | 支持任务完成的先后顺序 | 支持任务完成的先后顺序 |
异常捕捉 | 自己捕捉 | 自己捕捉 | 自己捕捉 | 原生API支持,返回每个任务的异常 |
建议 | CPU高速轮询,耗资源,或者阻塞,可以使用,但不推荐 |
功能不对口,并发任务这一块多套一层,不推荐使用 |
推荐使用 ,Java8 之前最好的方案 |
J API极端丰富,配合流式编程,推荐使用 ! |
什么是异步编程?
- 异步编程是编写
非阻塞的代码
,运行的任务在一个单独的线程
,与主线程隔离
,并且会通知主线程它的进度
,成功或者失败
。在这种方式中,主线程不会被阻塞
,不需要一直等到子线程完成。主线程可以并行的执行其他任务。
使用这种并行方式,可以极大的提高程序的性能。
一.CompletionService
1.为什么使用CompletionService
java.util.concurrent.CompletionService 是对 ExecutorService 封装的一个增强类
,优化了获取异步操作结果的接口。 主要解决了 Future 阻塞
的问题。CompletionService的实现类有 ExecutorCompletionService
- CompletionService 实现内能够
一边处理 submit 的线程的任务
,一边处理已完成任务的结果
。这样就可以将执行任务与处理任务分离开来进行处理。
- 使用
submit()
执行任务,使用take()
取得已完成的任务(Future),并按照任务完成时间顺序
处理它们的结果。
假设我们要向线程池提交一批任务,并获取任务结果。一般的方式是提交任务后,从线程池得到一批 Future 对象集合,然后依次调用其 get() 方法。
- 因为我们会要按固定的顺序来遍历 Future 元素,而 get() 方法又是阻塞的, 遍历到某个 Future 元素,因为当前任务因为
执行时间较长
,而在他后面的任务可能执行时间较短
,导致已经提前完成
的任务不能得到及时处理
,只能等到前面的任务执行完成后才能处理
,显然这种情况用Future不合适的,效率也不高。
CompletionService主要应用场景: 同时执行多个Callable任务,并且需对任务的返回结果进行处理。若想优先处理先执行完的任务结果
,使用它尤其方便。
2.CompletionService接口常用方法
-
CompletionService是一个
泛型接口
,V
表示的就是当前任务结果返回类型
、任务提交类型
-
通过调用
take
和poll
的方法来获取到Future值
public interface CompletionService<V> {
//执行带返回值的Callable任务
Future<V> submit(Callable<V> task);
//执行带返回值的Runnable 任务,结果值需要自己传入
Future<V> submit(Runnable task, V result);
//获取先完成任务的Future对象,take()获取到的 Future对象,在调用 get()时,还是会阻塞的
Future<V> take() throws InterruptedException;
//获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null,poll() 方法没有阻塞效果
Future<V> poll();
/*等待指定的时间(timeout + unti),如果在 timeout 时间内获得结束时立即向下继续执行,如果超过时也立即向下执行。*/
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
//异常处理=>在某个线程内抛出异常,如果只是通过 take() 方法获取到 Future 对象,那么无法在 控制台 中得到相关的异常信息。只有调用 Future 对象的 get() 方法才能获得异常栈信息
}
3.CompletionService源码解析
- ExecutorCompletionService实际上是
ExecutorService
和BlockingQueue
的结合体,ExecutorService用来提交任务
,而BlockingQueue
用来保存Future的执行结果
ExecutorCompletionService
是CompletionService接口的唯一实现类
。可以将执行完成的任务结果放到阻塞队列中
,这样等待结果的线程,如果执行take()方法
会得到结果并恢复执行。甚至可以调用poll()方法不阻塞获取结果
源码如下
public class ExecutorCompletionService<V> implements CompletionService<V> {
//执行线程的线程池
private final Executor executor;
//阻塞队列,保存完成的Future;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
继承FutureTask,并实现了done()方法,done()方法把任务放到了ExecutorCompletionService的阻塞队列中(所以QueueingFuture是私有类),done()方法只有在任务执行完成后才会调用。
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() {
completionQueue.add(task); }
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
//传入一个线程池,初始化默认任务完成队列为LinkedBlockingQueue
public ExecutorCompletionService(Executor executor) {
if (executor == null) throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
//传入一个线程池以及任务完成队列
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null) throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
//可以看出ExecutorCompletionService需要`依赖线程池Executor对象`,
//其大部分功能是使用线程池 `ThreadPoolExecutor`实现的。
//执行带返回值的Callable任务,把任务封装FutureTask并作为QueueingFuture的属性保存,然后提交QueueingFuture到线程池;
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
//调用线程池的execute方法执行封装好的任务QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
//执行带返回值的Runnable 任务,结果值需要自己传入,把任务封装FutureTask并作为QueueingFuture的属性保存,然后提交QueueingFuture到线程池;
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
//调用线程池的execute方法执行封装好的任务QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
//获取先完成任务的Future对象,take()获取到的 Future对象,在调用 get()时,还是会阻塞的
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
//获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null,poll() 方法没有阻塞效果
public Future<V> poll() {
return completionQueue.poll();
}
//等待指定的时间(timeout + unti),如果在 timeout 时间内获得结束时立即向下继续执行,如果超过时也立即向下执行。
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
-
CompletionService 之所以能够做到这点,是因为它没有采取依次遍历 Future 的方式,而是在内部维护了一个保存
Future类型
的的结果队列
,当任务的任务完成后马上将结果放入队列,那么从队列中取到的就是最早完成的结果。
通过使用BlockingQueue的
take或poll方法
,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。 -
如果
队列为空
,那么take() 方法会阻塞直到队列中出现结果为止
。 CompletionService 还提供一个poll() 方法
,返回值与 take() 方法一样,不同之处在于它不会阻塞
,如果队列为空则立刻返回 null
。这算是给用户多一种选择。
4.CompletionService使用方式
ExecutorCompletionService它本身不包含线程池
,创建一个 ExecutorCompletionService需要先创建一个Executor
,然后通过构造方法
传入。
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
CompletionService 提交任务的方式与 ExecutorService 一样
:
completionService.submit(new Worker(1));//Worker自己创建的一个实现了callable的任务类,当前方法也可以返回一个Future
当你需要获得结果的时候,你不需要再持有 Future 集合。如果要得到最早的执行结果,只需要像下面这样:
String result = completionService.take().get();
take()
方法返回的是最早完成的任务的结果
,这个就解决了一个任务被另一个任务阻塞的问题,甚至可以通过循环调用poll()方法不阻塞获取结果
。
下面是一个完整的例子:
public class ExecutorServiceTest {
public static void main(String[] args) {
//创建6个任务
List<Worker> workers = new ArrayList<>();
for (int i = 6; i >= 1; i--) {
//创建任务并随机设置 线程休眠倍数
workers.add(new Worker(new Random().nextInt(10)));
}
//创建线程池ExecutorCompletionService,传入一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
//循环提交任务
workers.forEach(completionService::submit);
//循环获取任务结果
workers.forEach(worker -> {
try {
String result = completionService.take().get();
System.out.println("执行结果 " + result);
} catch (InterruptedException | ExecutionException e) {
System.out.println("获取结果失败");
}
});
executorService.shutdown();
}
public static class Worker implements Callable<String> {
private final int index;
public Worker(int index) {
this.index = index;
}
@Override
public String call() throws Exception {
//休眠毫秒
int sleepTime = 200 * index;
System.out.println(Thread.currentThread().getName() + " 正在运行,休眠时间="+sleepTime+"ms");
//线程休眠
Thread.sleep(sleepTime);
//返回结果
return Thread.currentThread().getName() + " is Done";
}
}
}
二.CompletableFuture
1.为什么使用CompletableFuture
在Java 8
中, 新增加了一个包含50个方法左右的类
: CompletableFuture,默认
依靠fork/join框架
启动新的线程实现异步与并发的,提供了非常强大的Future的扩展功能
,可以帮助我们简化异步编程
的复杂性,提供了函数式编程
的能力,可以通过回调函数
的方式处理返回结果
,并且提供了转换和组合CompletableFuture的方法
。
主要是为了解决Future模式的缺点:
- a. Future虽然可以实现异步获取线程的执行结果,但是
Future没有提供通知机制
,调用方无法得知Future什么时候执行完的问题
。 - b. 想要获取Future的结果,要么使用
阻塞
, 在future.get()
的地方等待Future返回结果
,这时会变成同步操作
。要么使用isDone()
方法进行轮询
,又会耗费无谓的 CPU 资源
。 - c. 从 Java 8 开始引入了
CompletableFuture
,它针对Future做了改进,可以传入回调对象
,当异步任务完成
或者发生异常时
,自动调用回调对象的回调方法。
类图
CompletionStage(完成阶段)
-
CompletionStage: 代表
异步任务执行过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
-
一个阶段的执行可以是一个
Function
,Consumer
或者Runnable
。比如:stage.thenApply(x -> square(x)) .thenAccept(x -> System.out.print(x)) .thenRun(() -> System.out.println())
-
一个阶段的执行
可能是被单个阶段的完成触发
,也可能是由多个阶段一起触发
CompletionStage接口实现流式编程
- J此接口包含38个方法、这些方法主要是为了支持函数式编程中流式处理。
2.创建异步任务
CompletableFuture 提供了四个静态方法来创建一个异步操作。
//runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable<T> task) 方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
//这两方法的效果跟submit是一样的
-
runAsync():它以
Runnabel类型
为参数,所以返回结果为空。用于没有返回值的任务。
-
supplyAsync():以
Supplier<U>函数式接口
类型为参数,返回类型为U
。用于有返回值的任务, -
没有指定Executor
的方法会使用默认线程池ForkJoinPool.commonPool()
。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
-
注意:这些线程都是
Daemon线程(守护线程)
,主线程结束,Daemon线程不结束,只有JVM关闭时,生命周期终止。
简单用法
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//长时间的计算任务
try {
System.out.println("计算型任务开始");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "计算型任务结束";
});
Thread.sleep(2000);
System.out.println(future.get());
}
执行结果
3.异步回调
3.1.thenApply / thenApplyAsync-结果转换
当上一个的CompletableFuture
执行完后,将结果传递给函数fn
,将fn的结果
作为新的CompletableFuture任务
的参数,进行新一轮的处理
。因此它的功能相当于将CompletableFuture<T>
转换成 CompletableFuture<U>
- thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中
- 该方法并不是马上执行的,也不会阻塞,而是在前一个任务完成后继续执行。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U>
- T:上一个任务返回结果的类型
- U:当前任务的返回值类型
示例
@Test
public void testD() throws ExecutionException, InterruptedException {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long result = new Random().nextInt(100);
System.out.println("result1="+result);
return result;
}
}).thenApply(new Function<Long, Long>() {
@Override
public Long apply(Long t) {
long result = t*5;
System.out.println("result2="+result);
return result;
}
});
long result = future.get();
System.out.println(result);
}
执行结果
可以看出第二个任务依赖第一个任务的结果。
3.2.thenAccept-消费结果
thenAccept 同 thenApply
接收上一个任务的返回值作为参数,但是回调方法无返回值
接收任务的处理结果,并消费处理,无返回结果。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
示例
@Test
public void TestF() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
//处理上一次任务结果
.thenApply(s -> s + " world!")
//处理上一次任务结果
.thenApply(s -> s + "This is CompletableFuture demo")
//处理上一次任务结果
.thenApply(String::toLowerCase)
//使用最终结果
.thenAccept(new Consumer() {
@Override
public void accept(Object o) {
System.out.println("消费结果:"+o);
}
});
}
执行结果
从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理
。并没有后续的输错操作。
3.3.thenRun-任务完成后触发的回调
thenRun 是上一个任务完成后
触发的回调,方法没有入参
,也没有返回值
。
跟
thenAccept
方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成
,就开始执行thenRun
。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
示例
@Test
public void TestG() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return new Random().nextInt(10);
}
}).thenRun(() -> {
System.out.println("thenRun ...");
});
future.get();
}
执行结果
该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把返回结果传给 thenRun 方法。
只是处理任务后,执行 thenAccept 的后续操作。
3.4.exceptionally-异常回调
xceptionally方法指定某个任务执行异常时
触发的回调方法,会将抛出异常作为参数传递到回调方法中
,如果该任务正常执行
则会exceptionally方法返回的CompletionStage
的result
就是该任务正常执行的结果
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
示例
@Test
public void testC() throws InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
//随机数小于0.3
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
System.out.println("run end ...");
});
//产生异常时触发
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败!" + t.getMessage());
return null;
}
});
TimeUnit.SECONDS.sleep(2);
}
执行异常
3.5.whenComplete-任务完成,抛出异常时的回调
当CompletableFuture的任务执行完成,或者抛出异常
的时执行的回调函数。主要是下面的方法:
whenComplete是当
某个任务执行完成后
执行的回调方法,会将执行结果
或者执行期间抛出的异常
传递给回调方法,如果是正常执行则异常为null
,回调方法对应的CompletableFuture
的result
和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常,
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
-
可以看到Action的类型是
BiConsumer<? super T,? super Throwable>
它可以处理正常的计算结果
,或者异常情况
。 -
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是
执行当前任务的线程
执行继续执行 whenComplete
的任务。 - whenCompleteAsync:是执行
把 whenCompleteAsync
这个任务继续提交给线程池
来进行执行
。
示例
@Test
public void testC() throws InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
//随机数小于0.3
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
System.out.println("run end ...");
});
//当前任务执行完成后继续执行whenComplete的任务
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("执行完成!");
}
});
//产生异常时触发
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败!" + t.getMessage());
return null;
}
});
TimeUnit.SECONDS.sleep(2);
}
执行成功
可见CompletableFuture的优点是:
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行。
3.6.handle-任务结果处理
- handle 是
执行任务完成时对结果的处理。
- handle 方法和
thenApply
方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务
。而thenApply 任务中出现异常则不执行 thenApply 方法
。 - 即: 任务完成或者任务过程中抛出异常都会进入handle
跟whenComplete基本一致,区别在于
handle的回调方法有返回值
,且handle方法返回的CompletableFuture
的result是回调方法的执行结果或者回调方法执行期间抛出的异常
,与原始CompletableFuture的result无关
了 - handle 方法和
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例
@Test
public void TestE() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int i= 10/0;//抛出异常
return new Random().nextInt(10);
}
//无论任务是否产生异常都会进入handle
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if(throwable==null){
result = param * 2;
}else{
System.out.println(throwable.getMessage());
}
return result;
}
});
System.out.println(future.get());
}
执行结果
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作,并不会因为出现异常而中断任务
。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
4.组合处理
4.1.thenAcceptBoth - 2个任务同时完成后的结果消费
当两个CompletionStage都执行完成后
,把结果一块交给thenAcceptBoth来进行消耗
- 将两个CompletableFuture组合起来,只有这
两个都正常执行完
了才会执行某个任务,thenCombine会将两个任务的执行结果
作为方法入参
传递到指定方法中,且该方法有返回值
- 注意两个任务中只要有一个
执行异常
,则将该异常信息
作为指定任务的执行结果
。
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
示例
@Test
public void TestI() throws InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
}
});
f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer t, Integer u) {
System.out.println("f1=" + t + ";f2=" + u + ";");
}
});
}
执行结果
- thenAcceptBoth的,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。
4.2.thenCombin-2个任务同时完成后的结果合并
- thenCombine(): 会把
两个 CompletionStage 的任务都执行完成后
,把两个任务的结果
传参给BiFunction进行结果合并
操作。
- 将两个CompletableFuture组合起来,只有这
两个都正常执行完
了才会执行某个任务,thenCombine会将两个任务的执行结果
作为方法入参
传递到指定方法中,且该方法有返回值
- 注意两个任务中只要有一个
执行异常
,则将该异常信息
作为指定任务的执行结果
。- 两个CompletionStage是
并行执行的
,它们之间并没有先后依赖顺序
,other并不会等待
先前的CompletableFuture执行完毕后再执行。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
示例
@Test
public void TestH() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "world!";
}
});
//使用thenCombine是将future1 和future2的结果汇总,这一点跟thenCompose()不同。其中future1和future2是并行执行的。
CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
@Override
public String apply(String t, String u) {
return t+" "+u;
}
});
System.out.println(result.get());
}
执行结果
- 其实从功能上来讲,它的功能更像
thenAcceptBoth
,只不过thenAcceptBoth
是纯消费
,它的回调参数没有返回值,而thenCombine的回调函数有返回值。
4.3.runAfterBoth-2个任务同时完成时的回调
当两个CompletionStage
都正常执行完成的时候
,执行一个Runnable
,这个Runnable并不使用任务返回的结果。
- 将两个CompletableFuture组合起来,只有这
两个都正常执行完
了才会执行某个任务,runAfterBoth没有入参,也没有返回值
。- 注意两个任务中只要有一个
执行异常
,则将该异常信息
作为指定任务的执行结果
。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
@Test
public void TestM() throws InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.runAfterBoth(f2, new Runnable() {
@Override
public void run() {
System.out.println("上面两个任务都执行完成了。");
}
});
//休眠6秒防止主线程结束
TimeUnit.SECONDS.sleep(6);
}
执行结果
4.4.applyToEither-2个任务任意一个完成后的结果处理
applyToEither方法是:两个CompletionStage
,当任意一个
CompletionStage完成
的时候,回调函数fn
会被执行,它的返回值会当作新的CompletableFuture<U>的参数
。
- 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的
转化操作(有返回值)
- 将
两个CompletableFuture
组合起来,只有这两个都正常执行完
了才会执行某个任务,runAfterBoth没有入参,也没有返回值
。- 注意两个任务中只要有一个
执行异常
,则将该异常信息
作为指定任务的执行结果
。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
示例
@Test
public void TestJ() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
}
});
CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t) {
System.out.println(t);
return t * 2;
}
});
//休眠6秒防止主线程结束
TimeUnit.SECONDS.sleep(6);
System.out.println(result.get());
}
执行结果
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
4.5.acceptEither-2个任务任意一个完成后的结果消费
applyToEither方法是:两个CompletionStage
,当任意一个
CompletionStage完成
的时候,回调函数
会被执行,它的返回值会当作新的CompletableFuture<Void>的参数
。
- 将
两个CompletableFuture
组合起来,只有这两个都正常执行完
了才会执行某个任务,acceptEither将已经执行完成
的任务的执行结果
作为方法入参
,但是没有返回值
- 注意两个任务中只要有一个
执行异常
,则将该异常信息
作为指定任务的执行结果
。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
示例
@Test
public void TestK() throws InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
}
});
f1.acceptEither(f2, new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(t);
}
});
//休眠6秒防止主线程结束
TimeUnit.SECONDS.sleep(6);
}
执行结果
4.6.runAfterEither-2个任务任意一个完成后的回调
两个CompletionStage
,任意一个完成了
都会执行下一步的消费操作(Runnable)
- 将
两个CompletableFuture
组合起来,只有这两个都正常执行完
了才会执行某个任务,runAfterEither没有方法入参
,也没有返回值
- 注意两个任务中只要有一个
执行异常
,则将该异常信息
作为指定任务的执行结果
。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
@Test
public void TestL() throws InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
}
});
f1.runAfterEither(f2, new Runnable() {
@Override
public void run() {
System.out.println("上面有一个已经完成了。");
}
});
//休眠6秒防止主线程结束
TimeUnit.SECONDS.sleep(6);
}
执行结果
4.7.thenCompose-任务串行
thenCompose 可以用于组合多个CompletableFuture
,将前一个任务的返回结果
作为下一个任务的参数
,它们之间存在着先后顺序
thenCompose方法会在
某个任务执行完成后
,将该任务的执行结果
作为方法入参
然后执行指定的方法,该方法会返回一个新的CompletableFuture实例
,如果该CompletableFuture实例的result不为null
,则返回一个基于该result的新的CompletableFuture实例
;如果该CompletableFuture实例为null,则,然后执行这个新任务
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
示例
@Test
public void TestN() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
System.out.println("t1=" + t);
return t;
}
}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = param * 2;
System.out.println("t2=" + t);
return t;
}
});
}
});
System.out.println("thenCompose result : " + f.get());
}
执行结果
theApply 强调的是
类型转换
,而thenCompose强调的是执行顺序
,就是前一个任务结果作为下一个任务的参数
。
4.8.completeExceptionally-异常处理
为了能获取任务线程内发生的异常,需要使用 CompletableFuture的completeExceptionally方法
将导致CompletableFuture内发生的异常抛出。
- 这样,当执行任务发生异常时,
调用get()方法的线程
将会收到一个ExecutionException异常
,该异常接收了一个包含失败原因的Exception 参数
。
/**
* 任务没有异常 正常执行,然后结束
*/
@Test
public void test1() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
执行结果
/**
* 线程有异常 正常执行,然后无法结束,主线程会一直等待
*/
@Test
public void test2() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
int i = 1 / 0;
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
执行结果
/**
* 线程有异常 正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
* 主线程接收到 程序继续处理,至结束
*/
@Test
public void test3() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
int i = 1/0;
} catch (Exception e) {
// 告诉completableFuture任务发生异常了
completableFuture.completeExceptionally(e);
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
执行结果
4.9.allOf和anyOf
- allOf() 是,
多个任务都执行完成后才会执行
,只要有一个任务执行异常
,返回的CompletableFuture执行get()
方法时会抛出异常
,如果都是正常执行
,则get返回null
- **anyOf()**是,多个任务,
只要只有一个任务执行完成
,无论是正常执行或者执行异常,都会执行回调
,返回的CompletableFuture执行get()
方法返回的就是已执行完成的任务的执行结果
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) ;
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) ;
使用方式
public class CompletableFutureDemo {
public static void main(String[] args) {
//记录开始时间
Long start = System.currentTimeMillis();
//定长10线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//任务的个数
final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
try {
List<String> resultList = new ArrayList<String>();
//生成多个CompletableFuture任务
CompletableFuture[] completableFutureArr = taskList.stream().map(num -> CompletableFuture.supplyAsync(() -> calc(num), executor)
.thenApply(h -> Integer.toString(h))
//如需获取任务完成先后顺序,此处代码即可
.whenComplete((v, e) -> {
System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
resultList.add(v);
})).toArray(CompletableFuture[]::new);
//1.allOf:组合多个CompletableFuture为一个CompletableFuture, 所有子任务全部完成,组合后的任务CompletableFuture才会完成
CompletableFuture.allOf(completableFutureArr).whenComplete((v, th) -> {
System.out.println("所有任务执行完成触发,结果resultList=" + resultList + ",耗时=" + (System.currentTimeMillis() - start));
});
//2.anyOf:组合多个CompletableFuture为一个CompletableFuture, 所有子任务中只要存在一个任务优先完成,,组合后的任务CompletableFuture就会完成
/*CompletableFuture.anyOf(completableFutureArr).whenComplete((v, th) -> {
System.out.println("所有任务中最快完成的一个任务触发,结果resultList=" + resultList + ",耗时=" + (System.currentTimeMillis() - start));
});*/
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
//根据数字判断线程休眠的时间
public static Integer calc(Integer i) {
try {
if (i == 1) {
//任务1耗时3秒
Thread.sleep(3000);
} else if (i == 5) {
//任务5耗时5秒
Thread.sleep(5000);
} else {
//其它任务耗时1秒
Thread.sleep(1000);
}
System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
allOf执行结果
task线程内=pool-1-thread-1,任务i=2完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=2完成!,result=2,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-3,任务i=3完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=3完成!,result=3,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-9,任务i=9完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-8,任务i=8完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-7,任务i=7完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-6,任务i=6完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=9完成!,result=9,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-4,任务i=4完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-10,任务i=10完成!,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=6完成!,result=6,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=7完成!,result=7,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=8完成!,result=8,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=10完成!,result=10,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
任务=4完成!,result=4,异常e=null,执行时间=Tue Mar 09 15:01:54 CST 2021
task线程内=pool-1-thread-2,任务i=1完成!,执行时间=Tue Mar 09 15:01:56 CST 2021
任务=1完成!,result=1,异常e=null,执行时间=Tue Mar 09 15:01:56 CST 2021
task线程内=pool-1-thread-5,任务i=5完成!,执行时间=Tue Mar 09 15:01:58 CST 2021
任务=5完成!,result=5,异常e=null,执行时间=Tue Mar 09 15:01:58 CST 2021
所有任务执行完成触发,结果resultList=[2, 3, 9, 6, 7, 8, 10, 4, 1, 5],耗时=5053
anyOf执行结果
task线程内=pool-1-thread-1,任务i=2完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=2完成!,result=2,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-8,任务i=8完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-9,任务i=9完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=8完成!,result=8,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-6,任务i=6完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-3,任务i=3完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=6完成!,result=6,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=3完成!,result=3,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-4,任务i=4完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-10,任务i=10完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=9完成!,result=9,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
所有任务中最快完成的一个任务触发,结果resultList=[2],耗时=1053
task线程内=pool-1-thread-7,任务i=7完成!,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=10完成!,result=10,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=4完成!,result=4,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
任务=7完成!,result=7,异常e=null,执行时间=Tue Mar 09 15:01:13 CST 2021
task线程内=pool-1-thread-2,任务i=1完成!,执行时间=Tue Mar 09 15:01:15 CST 2021
任务=1完成!,result=1,异常e=null,执行时间=Tue Mar 09 15:01:15 CST 2021
task线程内=pool-1-thread-5,任务i=5完成!,执行时间=Tue Mar 09 15:01:17 CST 2021
任务=5完成!,result=5,异常e=null,执行时间=Tue Mar 09 15:01:17 CST 2021
4.10.场景1:多个异步任务串行执行
- 如果只是实现了异步回调机制,我们还看不出
CompletableFuture
相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行
- 如场景: 定义两个CompletableFuture,第一个CompletableFuture
根据证券名称查询证券代码
,第二个CompletableFuture根据证券代码查询证券价格
,这两个CompletableFuture实现串行操作如下:
- 如场景: 定义两个CompletableFuture,第一个CompletableFuture
public class CompletableFutureMain2 {
public static void main(String[] args) throws Exception {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}
//根据证券名称查询证券代码
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return "601857";
}
//根据证券代码查询证券价格
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
执行结果
4.11.场景2: 多个异步任务并行执行
- 除了串行执行外,
多个CompletableFuture还可以并行执行
。- 如场景:
同时
从新浪和网易查询证券代码,只要任意一个返回结果
,就进行下一步查询价格,
查询价格也同时从新浪和网易查询,只要任意一个返回结果
,就完成操作`:
- 如场景:
public class CompletableFutureMain3 {
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
执行结果
业务逻辑如下
除了anyOf()
可以实现“任意个CompletableFuture只要一个成功
”,allOf()
可以实现“所有CompletableFuture都必须成功”
,这些组合操作可以实现非常复杂的异步流程控制。
最后我们注意CompletableFuture的命名规则:
xxx()
:表示该方法将继续在已有的线程中执行(串行)
;xxxAsync()
:表示将异步在线程池中执行(并行)
。