CompletableFuture实战

Future 模式是一种非常有用的设计思想。之前的两篇文章分别介绍了 Java 中 Future 模式实现的改进版 CompletionServiceFuture 模式(一)- Java 并发编程之 CompletionService)和如何实现自己的两种 Future 模式(Future 模式(二))。在 Java 8 中提供了一个更牛逼的 Future 模式实现:CompletableFuture,从名字就可以看出,这是一个完善的 Future。刚好最近在项目中使用了 Fork/Join + CompletableFuture 加快了脚本图片对比的效率,借此回顾一下。

举个我们平时开发做项目的场景,假设项目开发分为三个阶段:撸码 -> 自测 -> 合代码。项目要拆分成多个模块(Task)分配给多名开发(Thread)去完成,但是这样会有一些问题,有的开发撸码速度比较快,有的速度比较慢。为了充分压榨开发的劳动力(CPU),有谁代码撸完了就让他赶紧去做下一步任务(测试),也别等其他人了。

实现这个功能首先肯定需要使用 ExecutorService,由于要获取结果进行下一步操作,那么 Future 肯定要使用,如果使用 ExecutorService 的批处理方法 invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

需要挨个去判断集合中每个 Future 是否完成,但是 Futureget() 方法又是阻塞的。这个就比较麻烦了,可以看看用 CompletableFuture 怎么实现:

public class Tests {
    public static void main(String[] args) throws IOException {
        for (int i = 0; i < 10; i++) {
            CompletableFuture.supplyAsync(Tests::code).thenAccept(Tests::test);
        }
        System.in.read();
    }

    private static Integer code(){
        int workingTime = ThreadLocalRandom.current().nextInt(3, 10);
        try {
            TimeUnit.SECONDS.sleep(workingTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"完成工作,用时:"+workingTime);
        return workingTime;
    }

    private static void test(Integer result){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" -测试 【"+result+"】完成,用时:"+1);
    }
}

运行结果:

ForkJoinPool.commonPool-worker-7完成工作,用时:3
ForkJoinPool.commonPool-worker-2完成工作,用时:4
ForkJoinPool.commonPool-worker-3完成工作,用时:4
ForkJoinPool.commonPool-worker-7 -测试 【3】完成,用时:1
ForkJoinPool.commonPool-worker-3 -测试 【4】完成,用时:1
ForkJoinPool.commonPool-worker-2 -测试 【4】完成,用时:1
ForkJoinPool.commonPool-worker-4完成工作,用时:6
ForkJoinPool.commonPool-worker-5完成工作,用时:7
ForkJoinPool.commonPool-worker-4 -测试 【6】完成,用时:1
ForkJoinPool.commonPool-worker-1完成工作,用时:8
ForkJoinPool.commonPool-worker-6完成工作,用时:8
ForkJoinPool.commonPool-worker-5 -测试 【7】完成,用时:1
ForkJoinPool.commonPool-worker-6 -测试 【8】完成,用时:1
ForkJoinPool.commonPool-worker-1 -测试 【8】完成,用时:1
ForkJoinPool.commonPool-worker-2完成工作,用时:6
ForkJoinPool.commonPool-worker-2 -测试 【6】完成,用时:1
ForkJoinPool.commonPool-worker-7完成工作,用时:9
ForkJoinPool.commonPool-worker-3完成工作,用时:9
ForkJoinPool.commonPool-worker-7 -测试 【9】完成,用时:1
ForkJoinPool.commonPool-worker-3 -测试 【9】完成,用时:1

可以看到完成了第一阶段编码工作的线程立即就开始第二阶段的测试工作了,一刻不停。使用 CompletableFuture 完成这个功能是非常方便的,但是有一个要注意的地方,如果将 Demo 中的 main() 方法修改为:

 public static void main(String[] args) throws IOException {
        for (int i = 0; i < 10; i++) {
            CompletableFuture.supplyAsync(Tests::code).thenAccept(Tests::test);
        }
        //System.in.read();
    }

那么整个程序会很快结束,这是为啥呢。其实看 Demo 中 CompletableFuture 的 API 和 Demo 的运行结果,可以发现,线程池是肯定用到了的,但是我们并未自己去创建线程池,很显然,是 CompletableFuture 自己处理的,先看 supplyAsync() 静态方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

再看这个 asyncPool:

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

一般来说这个 asyncPool 就是 ForkJoinPool.commonPool(),为什么说是一般呢,其实这个参数挺有意思的,大家感兴趣可以继续看一下源码,比如 Java 8 中的 parallelStream 也与这个有关。

接着看 ForkJoinPool.commonPool(),在 ForkJoinPool 中有一个 DefaultForkJoinWorkerThreadFactory

static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new ForkJoinWorkerThread(pool);
    }
}

java.util.concurrent.ForkJoinPool#registerWorker 中可以看到会将 ForkJoinWorkerThread 设置为守护线程。这也就是 Demo 中主线程一旦执行完整个程序就终止了的原因。

继续刚刚的“项目”,代码完成了,也测试了,那么最后要合代码了。合代码就是将所有开发人员的产出聚集到一起。比如我这次在项目中就需要将多个脚本执行的结果进行聚合。主要用的是 CompletableFutureallOf 方法:

    /**
     * Returns a new CompletableFuture that is completed when all of
     * the given CompletableFutures complete.  If any of the given
     * CompletableFutures complete exceptionally, then the returned
     * CompletableFuture also does so, with a CompletionException
     * holding this exception as its cause.  Otherwise, the results,
     * if any, of the given CompletableFutures are not reflected in
     * the returned CompletableFuture, but may be obtained by
     * inspecting them individually. If no CompletableFutures are
     * provided, returns a CompletableFuture completed with the value
     * {@code null}.
     *
     * <p>Among the applications of this method is to await completion
     * of a set of independent CompletableFutures before continuing a
     * program, as in: {@code CompletableFuture.allOf(c1, c2,
     * c3).join();}.
     *
     * @param cfs the CompletableFutures
     * @return a new CompletableFuture that is completed when all of the
     * given CompletableFutures complete
     * @throws NullPointerException if the array or any of its elements are
     * {@code null}
     */
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

代码示例如下:

public class Tests2 {

    public static void main(String[] args) {

        Map<Integer, String> works = new HashMap<>(6);
        works.put(0, "a");
        works.put(1, "b");
        works.put(2, "c");
        works.put(3, "d");
        works.put(4, "e");
        works.put(5, "f");

        Map<Integer, String> resultMapv = new HashMap<>(6);

        System.out.println("Strat." + new Date().toLocaleString());

        CompletableFuture[] array = works.entrySet().stream().map(integerStringEntry ->
                CompletableFuture
                        .supplyAsync(() -> process(integerStringEntry))
                        .whenComplete((result, e) -> {
                            resultMapv.put(result.getKey(),result.getValue());
                        })
        ).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(array).join();
        System.out.println("End."+new Date().toLocaleString());
        System.out.println(resultMapv);
    }

    private static Map.Entry<Integer, String> process(Map.Entry<Integer, String> entry) {
        int workingTime = ThreadLocalRandom.current().nextInt(1, 10);
        try {
            TimeUnit.SECONDS.sleep(workingTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "完成工作,用时:" + workingTime);
        entry.setValue(entry.getValue() + "_finished");
        return entry;
    }
}

运行结果:

Strat.2019-9-21 20:21:16
ForkJoinPool.commonPool-worker-1完成工作,用时:2
ForkJoinPool.commonPool-worker-4完成工作,用时:2
ForkJoinPool.commonPool-worker-3完成工作,用时:2
ForkJoinPool.commonPool-worker-6完成工作,用时:3
ForkJoinPool.commonPool-worker-2完成工作,用时:5
ForkJoinPool.commonPool-worker-5完成工作,用时:5
End.2019-9-21 20:21:21
{0=a_finished, 1=b_finished, 2=c_finished, 3=d_finished, 4=e_finished, 5=f_finished}

在最慢的工作完成后,任务聚合,获取最终的结果。

欢迎关注公众号:
​​在这里插入图片描述

发布了406 篇原创文章 · 获赞 127 · 访问量 81万+

猜你喜欢

转载自blog.csdn.net/Dongguabai/article/details/101145256