jdk1.8组合式异步编程CompeletableFuture的使用以及项目如何使用的总结

前言

对CompeletableFuture的常用方法做一个简单的总结,以及简单梳理下我项目中是如何用它的。

用法总结

runAsync和supplyAsync

CompeletableFuture内部实现四个静态方法来启动异步任务:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
这两个放在一起介绍,这是创建异步任务的操作,简单说下它俩的区别:

  • runAsync:它执行的任务是不带返回值的
  • supplyAsync:它执行的任务是带返回值的

来看看他们如何使用的:

/**
 * 无返回值
 */
public static void runAsync() throws ExecutionException, InterruptedException {
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
    },poolExecutor);
    System.out.println(future.get());
}

/**
 * 有返回值
 */
public static void supplyAsync() throws ExecutionException, InterruptedException {
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
        return new Integer(1);
    },poolExecutor);
    System.out.println(future.get());
}

计算结果完成时的回调方法

在任务执行的时候,计算完成或者出现异常,可以执行相应的动作,主要由以下几种方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

这里就说下whenComplete和whenCompleteAsync的区别:

  • whenComplete:它是使用和上步任务相同的线程来执行的
  • whenCompleteAsync:它是将这个任务丢到线程池中,交给线程池来完成

我们来看看代码:

public static void whenComplete(){
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
        if (new Random().nextInt()%2 == 0){
    
    
            int i = 12/0;
        }
        System.out.println("run end ......");
    },poolExecutor);
    future.whenCompleteAsync(new BiConsumer<Void, Throwable>() {
    
    
        @Override
        public void accept(Void aVoid, Throwable throwable) {
    
    
            System.out.println("执行完成");
        }
    });
    future.exceptionally(new Function<Throwable, Void>() {
    
    
        @Override
        public Void apply(Throwable throwable) {
    
    
            System.out.println("执行失败");
            return null;
        }
    });
}

thenApply 方法

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
主要由以下几种方法:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
来看看代码:

public static void thenApply() throws ExecutionException, InterruptedException {
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
        return new Integer(2);
    },poolExecutor);
    CompletableFuture<Integer> thenApply = future.thenApply(new Function<Integer, Integer>() {
    
    
        @Override
        public Integer apply(Integer integer) {
    
    
            return integer + 1;
        }
    });
    System.out.println("原始值");
    System.out.println(future.get());
    System.out.println("后来值");
    System.out.println(thenApply.get());
}

handle方法

handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
主要由以下方法:

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

来看看代码:

public static void handle() throws ExecutionException, InterruptedException {
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
        int i = 1/0;
        return new Integer(2);
    },poolExecutor);
    CompletableFuture<Integer> handle = future.handle(new BiFunction<Integer, Throwable, Integer>() {
    
    
        @Override
        public Integer apply(Integer integer, Throwable throwable) {
    
    
            if (throwable != null) {
    
    
                System.out.println(throwable.getMessage());
            } else {
    
    
                return integer + 1;
            }
            return null;
        }
    });
    System.out.println(handle.get());
}

thenAccept

消费上步处理的结果,接收任务的处理结果,并消费处理,无返回结果。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

看看代码

public static void thenAccept(){
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
        return new Integer(2);
    },poolExecutor);
    CompletableFuture<Void> thenAccept = future.thenAccept(new Consumer<Integer>() {
    
    
        @Override
        public void accept(Integer integer) {
    
    
            System.out.println("我的任务就是打印上面任务的结果:" + integer);
        }
    });
}

thenRun

这个处理方法有点类似与thenAccept,不同的是,它不关注上层的结果,只需要直到上层完成就执行我的任务了

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

看看代码:

public static void thenRun(){
    
    
    MyThreadPool pool = new MyThreadPool();
    ThreadPoolExecutor poolExecutor = pool.threadPoolExecutor();
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
    
    
        try {
    
    
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
    
    
            e.printStackTrace();
        }
        return new Integer(2);
    },poolExecutor).thenRun(()-> {
    
    
        System.out.println("thenRun...");
    });
}

项目中的使用

我的项目是一个电商项目,在商品模块下,有一个查询订单详情的功能,这里面就涉及到很多任务,比如

  1. skuId对应商品的基本信息的获取
  2. skuId对应spuId的全部销售属性
  3. 获取spu的介绍
  4. 获取spu的规格参数
  5. 获取sku的图片信息

这里面的2、3、4这些异步任务是依赖于1的执行完成后获取1中spuId的信息才能进行,所以这里,将2、3、4与1线程串行化起来,利用CompeletableFuture中的thenAcceptAsync的方法来做的,5是独立的,所以可以再创建一个异步任务
最后将这些任务都利用CompeletableFuture的allOf进行合并,最后输出。

总结

以上就是我对CompeletableFuture的学习和在项目中的应用。

猜你喜欢

转载自blog.csdn.net/MarkusZhang/article/details/108053437