JDK1.8-CompletableFuture

携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第4天,点击查看活动详情

简介

CompletableFuture是一个可以显式完成的Future(设置它的值和状态)可以用作CompletionStage,支持在它完成时触发的依赖函数和动作

当两个或多个线程试图执行complete、completeExceptionally或者cancel一个CompletableFuture,只有一个线程成功。

下面我们还是来看看CompletableFuture类继承和实现关系: CompletableFuture类图.drawio.png

为啥需要CompletableFuture

通过简介我们可以了解到CompletableFuture除了具备Future的获得异步执行果、取消任务等功能,还实现了CompletionStage具备异步编排的功能。有在《java线程池源码阅读》中提到Future的实现类FutureTask,使用ThreadPoolExecutor就可以满足具备Future的能力,所以CompletableFuture的优势在于实现了CompletionStage接口,可以对异步进行编排,在具体介绍功能前我们可以看看原来使用线程池不具备编排的功能有啥缺点。

阻塞或者轮询等待结果执行本任务下一步

如果有两个异步线程需要拿到结果进行下一步,只能Future.get()阻塞等待或者使用Future.isDone()进行轮询,但是阻塞和异步编程的设计理解相违背,而轮询的方式会消耗CPU,可以看看下面的例子:

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;

import java.util.concurrent.*;

public class GetResultToDoNext {
    private static ThreadPoolExecutor myThreadPoll = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("GetResultToDoNext", true));

    @SneakyThrows
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        long startTime = System.currentTimeMillis();

        Future<Integer> task1 = myThreadPoll.submit(() -> {
            Thread.sleep(300);
            return 1;
        });

        Future<Integer> task2 = myThreadPoll.submit(() -> {
            Thread.sleep(200);
            return 2;
        });

        Integer task1Result = task1.get();
        Integer task2Result = task2.get();

        otherTask(task1Result);
        otherTask(task2Result);
        System.out.println("消耗:" + (System.currentTimeMillis() - startTime));
    }

    @SneakyThrows
    private static void otherTask(Integer result) {
        Thread.sleep(100);
        System.out.println(result * result);
    }
}

输出:

1
4
消耗:603

可以发现其实这两个任务的后续动作不应该等到两个结果都拿到才执行,应该拿到后马上执行。

可以发现笔者创建的线程池是用了守护进程,因为main是非守护进程,如果线程池使用非守护进程当main结束后不会退出,因为线程池创建的线程为非守护线程会一直阻塞等待任务进来。

使用CompletableFuture

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;

import java.util.concurrent.*;
import java.util.function.Supplier;

public class GetResultToDoNext {
    private static ThreadPoolExecutor myThreadPoll = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("GetResultToDoNext", true));

    public static void main(String[] args) {
//        useFuture();
        useCompletableFuture();
    }

    private static void useCompletableFuture() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<Void> task1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @SneakyThrows
            @Override
            public Integer get() {
                Thread.sleep(300);
                return 1;
            }
        }, myThreadPoll).thenAccept(GetResultToDoNext::otherTask);

        CompletableFuture<Void> task2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @SneakyThrows
            @Override
            public Integer get() {
                Thread.sleep(200);
                return 2;
            }
        }, myThreadPoll).thenAccept(GetResultToDoNext::otherTask);

        task1.join();
        task2.join();
        System.out.println("消耗:" + (System.currentTimeMillis() - startTime));
    }

    @SneakyThrows
    private static void useFuture() {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        long startTime = System.currentTimeMillis();

        Future<Integer> task1 = myThreadPoll.submit(() -> {
            Thread.sleep(300);
            return 1;
        });

        Future<Integer> task2 = myThreadPoll.submit(() -> {
            Thread.sleep(200);
            return 2;
        });

        Integer task1Result = task1.get();
        Integer task2Result = task2.get();

        otherTask(task1Result);
        otherTask(task2Result);
        System.out.println("消耗:" + (System.currentTimeMillis() - startTime));
    }

    @SneakyThrows
    private static void otherTask(Integer result) {
        Thread.sleep(100);
        System.out.println(result * result);
    }
}

输出:

1
4
消耗:426

使用CountDownLatch拿到多个线程结果

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;

import java.util.concurrent.*;

public class WaitThreadsResult {
    private static ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("UseCountDownLatchWait", true));

    public static void main(String[] args) {
        useCountDownLatch();
    }

    @SneakyThrows
    private static void useCountDownLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Future<Integer> task1 = myThreadPool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(100);
                countDownLatch.countDown();
                return 1;
            }
        });

        Future<Integer> task2 = myThreadPool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(100);
                countDownLatch.countDown();
                return 2;
            }
        });

        // 等待两个任务都完成
        countDownLatch.await();
        System.out.println(task1.get() + task2.get());
    }
}

使用CompletableFuture

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;

import java.util.concurrent.*;

public class WaitThreadsResult {
    private static ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("UseCountDownLatchWait", true));

    public static void main(String[] args) {
//        useCountDownLatch();
        useCompletable();
    }

    private static void useCompletable() {
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool);

        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            return 2;
        }, myThreadPool);

        task1.thenAcceptBoth(task2, (task1Result, task2Result) -> {
            System.out.println(task1Result + task2Result);
        }).join();
    }

    @SneakyThrows
    private static void useCountDownLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Future<Integer> task1 = myThreadPool.submit(() -> {
            countDownLatch.countDown();
            return 1;
        });

        Future<Integer> task2 = myThreadPool.submit(() -> {
            countDownLatch.countDown();
            return 2;
        });

        // 等待两个任务都完成
        countDownLatch.await();
        System.out.println(task1.get() + task2.get());
    }
}

可以发现更加优雅了。

扫描二维码关注公众号,回复: 14422170 查看本文章

基本使用

执行任务

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BaseUse {
    private static ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("UseCountDownLatchWait", true));

    public static void main(String[] args) {
        createTask();
    }

    private static void createTask() {
        //无返回值异步执行
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println("runAsync");
        }, myThreadPool);

        //有返回值异步执行
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool);

        // 常值返回
        CompletableFuture<Integer> completedFuture = CompletableFuture.completedFuture(2);

        runAsync.join();

        System.out.println(supplyAsync.join());

        System.out.println(completedFuture.join());
    }
}

then-紧跟着执行

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BaseUse {
    private static ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("UseCountDownLatchWait", true));

    public static void main(String[] args) {
        thenDo();
    }

    private static void thenDo() {
        //不依赖上个任务,执行run
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool).thenRunAsync(() -> {
            System.out.println("thenRunAsync");
        });

        // 依赖上个任务,第二个任务不返回结果
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool).thenAcceptAsync((result) -> {
            System.out.println(result);
        }, myThreadPool);

        // 依赖上个任务,第二个任务返回结果
        CompletableFuture<Integer> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool).thenApplyAsync((result) -> {
            return 2 + result;
        }, myThreadPool);
        System.out.println(thenApplyAsync.join());

        // 依赖上一个任务的结果
        CompletableFuture<String> f = CompletableFuture.completedFuture("OK");
        CompletableFuture.supplyAsync(() -> {
            return 2;
        }).thenComposeAsync((o) -> {
            System.out.println(o);
            return f;
        }, myThreadPool);

        System.out.println(f.join());
    }

可以处理异常的

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BaseUse {
    private static ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("UseCountDownLatchWait", true));

    public static void main(String[] args) {
        doContainException();
    }

    private static void doContainException() {
        // 既可以处理异常也可以返回处理其正常返回的值
        CompletableFuture<String> handleAsync = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "hello";
        }, myThreadPool).handleAsync((data, e) -> {
            e.printStackTrace();
            return data;
        }, myThreadPool);
        System.out.println(handleAsync.join());

        // 相当于finally
        CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "hello";
        }, myThreadPool).exceptionally(e -> {
            e.printStackTrace();
            return "d";
        });
        System.out.println(exceptionally.join());

        // 任务完成或者异常时运行,无返回值
        CompletableFuture<String> whenCompleteAsync = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "hello";
        }, myThreadPool).whenCompleteAsync((data, e) -> {
            System.out.println(e.getMessage());
            System.out.println(data);
        });
        whenCompleteAsync.join();
    }

    private static void thenDo() {
        //不依赖上个任务,执行run
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool).thenRunAsync(() -> {
            System.out.println("thenRunAsync");
        });

        // 依赖上个任务,第二个任务不返回结果
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool).thenAcceptAsync((result) -> {
            System.out.println(result);
        }, myThreadPool);

        // 依赖上个任务,第二个任务返回结果
        CompletableFuture<Integer> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
            return 1;
        }, myThreadPool).thenApplyAsync((result) -> {
            return 2 + result;
        }, myThreadPool);
        System.out.println(thenApplyAsync.join());

        // 依赖上一个任务的结果
        CompletableFuture<String> f = CompletableFuture.completedFuture("OK");
        CompletableFuture.supplyAsync(() -> {
            return 2;
        }).thenComposeAsync((o) -> {
            System.out.println(o);
            return f;
        }, myThreadPool);

        System.out.println(f.join());
    }
}    

组合

package com.study.completablefuture;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class BaseUse {
    private static ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("UseCountDownLatchWait", true));

    public static void main(String[] args) {
        combinationTask();
    }

    @SneakyThrows
    private static void combinationTask() {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @SneakyThrows
            @Override
            public Integer get() {
                Thread.sleep(100);
                return 1;
            }
        }, myThreadPool);

        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @SneakyThrows
            @Override
            public Integer get() {
                Thread.sleep(300);
                return 2;
            }
        }, myThreadPool);

        // 所有任务完成才执行下一步
        CompletableFuture.allOf(task1, task2).thenRun(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                System.out.println(task1.get() + task2.get());
            }
        });

        // 任意一个任务执行完就不阻塞
        CompletableFuture<Object> future2 = CompletableFuture
                .anyOf(CompletableFuture.runAsync(new Runnable() {
                            @SneakyThrows
                            @Override
                            public void run() {
                                Thread.sleep(100);
                                System.out.println("ddf");
                            }
                        }),
                        CompletableFuture.runAsync(new Runnable() {
                            @SneakyThrows
                            @Override
                            public void run() {
                                Thread.sleep(200);
                                System.out.println("dddd");
                            }
                        }));
        //其中一个任务行完即可
        future2.join();
    }
}

使用需要注意的地方

参考

基础篇:异步编程不会?我教你啊!CompletableFuture(JDK1.8)

CompletableFuture原理与实践-外卖商家端API的异步化

异步编程利器:CompletableFuture详解 |Java 开发实战

一文带你玩转 CompletableFuture 异步编程

猜你喜欢

转载自juejin.im/post/7126362999488512031