从Java Future 到 Guava ListenableFuture实现异步非阻塞调用

前言

     随着移动互联网的蓬勃发展,手机App层出不穷,其业务也随之变得错综复杂。针对于开发人员来说,可能之前的一个业务只需要调取一次第三方接口以获取数据,而如今随着需求的增加,该业务需调取多个不同的第三方接口。通常,我们处理方法是让代码同步顺序的去调取这些接口。显然,调取接口数量的增加必然会造成响应时间的增加,势必会对系统性能造成一定影响

      为了保证系统响应迅速,需要寻找一种方法能够使调取接口能够异步执行,而java正好提供了类似的方法,在java.util.concurrent中包含了Future相关的类,运用其中的一些类可以进行异步计算,以减少主线程的等待时间。

        比如启动一个main方法,main中又包含了若干个其它任务,在不使用java future的情况下,main方法中的任务会同步阻塞执行,一个执行完成后,才能去执行另一个;如果使用java future,则main方法中的任务会异步执行,main方法不用等待一个任务的执行完成,只需往下执行就行。一个任务的执行结果又该怎么获取呢?这里就需要用到Future接口中的isDone()方法来判断任务是否执行完,如果完成完成则可获取结果,如果没有完成则需要等待,可见虽然主线程中的多个任务是异步执行,但是无法确定任务什么时候执行完成,只能通过不断去监听以获取结果,所以这里是阻塞的。这样,可能某一个任务执行时间很长会拖累整个主任务的执行。

     针对这样的情况,google对java.util.concurrent中的许多类进行封装,最终产生了google guava框架,其中com.google.common.util中的ListenableFuture就是本文要叙述的重点。查看com.google.common.util,发现其中的很多类都是对java.util.concurrent的封装,以增加特有的方法。ListenableFuture扩展了future方法,增加了addListener方法,该方法可以监听线程,并通过回调函数来获取结果,达到线程之间异步非阻塞执行

     首先,了解下同步、异步、阻塞、非阻塞相关概念;其次,简单介绍java future和guava future相关技术,并通过示例代码进一步对其进行理解;最后,对java future和guava future进行比较。


同步、异步、阻塞、非阻塞


1 例子

故事:老王烧开水。

出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。

老王想了想,有好几种等待方式

1.老王用水壶煮水,并且站在那里不管水开没开,每隔一定时间看看水开了没。-同步阻塞

老王想了想,这种方法不够聪明。

2.老王还是用水壶煮水,不再傻傻的站在那里看水开,跑去寝室上网但是还是会每隔一段时间过来看看水开了没有,水没有开就走人。-同步非阻塞

扫描二维码关注公众号,回复: 2549014 查看本文章

老王想了想,现在的方法聪明了些,但是还是不够好。

3.老王这次使用高大上的响水壶来煮水,站在那里但是不会再每隔一段时间去看水开,而是等水开了,水壶会自动的通知他。-异步阻塞

老王想了想,不会呀,既然水壶可以通知我,那我为什么还要傻傻的站在那里等呢,嗯,得换个方法。

4.老王还是使用响水壶煮水,跑到客厅上网去,等着响水壶自己把水煮熟了以后通知他。-异步非阻塞

老王豁然,这下感觉轻松了很多。


  • 同步和异步

    同步就是烧开水,需要自己去轮询(每隔一段时间去看看水开了没),异步就是水开了,然后水壶会通知你水已经开了,你可以回来处理这些开水了。
    同步和异步是相对于操作结果来说,会不会等待结果返回。

  • 阻塞和非阻塞

    阻塞就是说在煮水的过程中,你不可以去干其他的事情,非阻塞就是在同样的情况下,可以同时去干其他的事情。阻塞和非阻塞是相对于线程是否被阻塞。

   同步:所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。也就是必须一件一件事做,等前一件做完了才能做下一件事。

     异步:异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

     阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。

     非阻塞:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。

Java future

减少主函数的等待时间,使得原本需要等待的时间段可以处理其它事情

1、Executors创建线程池的几种常见方式

     通过Executors可以创建不同类似的线程池,常见的大概有下表几种类型,还有些可能未被列出。在实际应用中,个人感觉主要使用newCachedThreadPool和newFixedThreadPool来创建线程池。

类名 说明
newCachedThreadPool 缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse;如果没有,就建一个新的线程加入池中。缓存型池子通常用于执行一些生存期很短的异步型任务。因此在一些面向连接的daemon型SERVER中用得不多。能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout为60s,超过这个IDLE时长,线程实例将被终止并移出池子。注意:放入CachedThreadPool的线程超过TIMEOUT不活动,其会自动被终止。
newFixedThreadPool 和cacheThreadPool类似,有可用的线程就使用,但不能随时建新的线程。其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。cache池和fixed池调用的是同一个底层池,只不过参数不同:fixed池线程数固定,并且是0秒IDLE(无IDLE)。所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器。cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE。
ScheduledThreadPool 调度型线程池。这个池子里的线程可以按schedule依次delay执行,或周期执行。
SingleThreadExecutor 单例线程,任意时间池中只能有一个线程。用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)。



2、Executors创建线程池源码

//调用newCachedThreadPool方法,可以创建一个缓冲型线程池,而在改方法中通过传参创建一个ThreadPoolExecutor,我么你会很奇怪明明返回的是一个ExecutorService,怎么会创建了一个ThreadPoolExecutor呢?
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, 
                   TimeUnit.SECONDS, new SynchronousQueue<Runnable());
}

// ThreadPoolExecutor继承了抽象的service类AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {}

//AbstractExecutorService实现了ExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {}

//所以ExecutorService其实是ThreadPoolExecutor的基类,这也就解释清楚了

3、ExecutorService(线程池)

     ExecutorService是一个接口,它继承了Executor,在原有execute方法的基础上新增了submit方法,传入一个任务,该方法能够返回一个Future对象,可以获取异步计算结果。


//ExecutorService继承了Executor,并扩展了新方法。
public interface ExecutorService extends Executor { }

//Executor中的方法
void execute(Runnable command);

//增加了submit方法,该方法传任务来获取Future对象,而Future对象中可以获取任务的执行结果
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

4、Future(获取异步计算结果)

     Future接口中有下表所示方法,可以获取当前正在执行的任务相关信息。

方法 说明
boolean cancel(boolean interruptIf) 取消任务的执行
boolean isCancelled() 任务是否已取消,任务正常完成前将其取消,返回 true
boolean isDone() 任务是否已完成,任务正常终止、异常或取消,返回true
V get() 等待任务结束,然后获取V类型的结果
V get(long timeout, TimeUnit unit) 获取结果,设置超时时间

5、FutureTask

       Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

       FutureTask包装了Callable和Runnable接口对象,提供对Future接口的基本实现,开始、取消计算、查询计算是否完成、获取计算结果。仅当计算完成时才能检索结果,当计算没有完成时,该方法会一直阻塞直到任务转入完成状态。一旦完成计算,不能够重新开始或取消计算。通过Excutor(线程池)来执行,也可传递给Thread对象执行。

        如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

//通过传入任务来构造FutureTask
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}

//FutureTask中同样有获取当前任务状态的方法
public boolean isCancelled(){}
public boolean isDone() {}
public boolean cancel(boolean mayInterruptIfRunning) {}

//FutureTask实现RunnableFuture
public class FutureTask<V> implements RunnableFuture<V> {}

//RunnableFuture继承Runnable和Future
public interface RunnableFuture<V> extends Runnable, Future<V> 
6、示例代码
package guava.future;

import java.util.Random;
import java.util.concurrent.*;

public class FutureTest {

    // 创建线程池
    final static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Long t1 = System.currentTimeMillis();

        // 任务1
        Future<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        while (true) {
            if (booleanTask.isDone() && !booleanTask.isCancelled()) {
                //模拟耗时
                Thread.sleep(500);
                Boolean result = booleanTask.get();
                System.err.println("BooleanTask: " + result);
                break;
            }
        }

        // 任务2
        Future<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        while (true) {
            if (stringTask.isDone() && !stringTask.isCancelled()) {
                String result = stringTask.get();
                System.err.println("StringTask: " + result);
                break;
            }
        }



        // 任务3
        Future<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        while (true) {
            if (integerTask.isDone() && !integerTask.isCancelled()) {
                Integer result = integerTask.get();
                System.err.println("IntegerTask: " + result);
                break;
            }
        }

        // 执行时间
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}

返回结果

BooleanTask: true
StringTask: Hello World
IntegerTask: 29
time: 508

Guava future

减少主函数的等待时间,使得多任务能够异步非阻塞执行


        ListenableFuture顾名思义就是可以监听的Future,它是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用ListenableFuture Guava帮我们检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

        ListenableFuture是一个接口,它从jdk的Future接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

接口

传统JDK中的Future通过异步的方式计算返回结果:
在多线程运算中可能或者可能在没有结束返回结果,
Future是运行中的多线程的一个引用句柄,确保在服务执行返回一个Result。


ListenableFuture可以允许你注册回调方法(callbacks),
在运算(多线程执行)完成的时候进行调用,  或者在运算(多线程执行)完成后立即执行。
这样简单的改进,使得可以明显的支持更多的操作,
这样的功能在JDK concurrent中的Future是不支持的。

ListenableFuture 中的基础方法是addListener(Runnable, Executor),
 该方法会在多线程运算完的时候,指定的Runnable参数传入的对象会被指定的Executor执行。

        我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例。

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("call execute..");
                TimeUnit.SECONDS.sleep(1);
                return 7;
            }
        });
       
        首先通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后使用此实例的submit方法即可初始化ListenableFuture对象。

        我们上文中定义的ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字7.


        有了ListenableFuture实例,有两种方法可以执行此Future并执行Future完成之后的回调函数。


方法一:通过ListenableFuture的addListener方法


   listenableFuture.addListener(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("get listenable future's result " + listenableFuture.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);
     

            方法二:通过Futures的静态方法addCallback给ListenableFuture添加回调函数

添加回调(Callbacks)

多数用户喜欢使用 
Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)的方式, 
或者 addCallback(ListenableFuture<V> future,FutureCallback<? super V> callback),
默认是采用 MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,
Callback采用轻量级的设计.  FutureCallback<V> 中实现了两个方法:

onSuccess(V),
在Future成功的时候执行,根据Future结果来判断。
onFailure(Throwable), 
在Future失败的时候执行,根据Future结果来判断。


  Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                System.out.println("get listenable future's result with callback " + result);
            }


            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        });

        推荐使用第二种方法,因为第二种方法可以直接得到Future的返回值,或者处理错误情况。本质上第二种方法是通过调动第一种方法实现的,做了进一步的封装。

另外ListenableFuture还有其他几种内置实现:

        SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息

        CheckedFuture: 这是一个继承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future执行发生异常时,可以抛出指定类型的异常。

Guava也提供了 CheckedFuture<V, X extends Exception> 接口。
CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,
方法声明抛出检查异常.
这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易 。
将 ListenableFuture 转换成CheckedFuture,
可以使用 Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)。

示例:

package guava.future;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class GuavaFutureTest {

    // 创建线程池
    final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    public static void main(String[] args) throws Exception {
        Long t1 = System.currentTimeMillis();
        // 任务1
        ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        Futures.addCallback(booleanTask, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(Boolean result) {
                System.err.println("BooleanTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任务2
        ListenableFuture<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        Futures.addCallback(stringTask, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.err.println("StringTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任务3
        ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        Futures.addCallback(integerTask, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println("IntegerTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 执行时间
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }



}

返回结果

BooleanTask: true
StringTask: Hello World
time: 65
IntegerTask: 37

Java future 和 Guava future的对比

        Future 具有局限性。在实际应用中,当需要下载大量图片或视频时,可以使用多线程去下载,提交任务下载后,可以从多个Future中获取下载结果,由于Future获取任务结果是阻塞的,所以将会依次调用Future.get()方法,这样的效率会很低。很可能第一个下载速度很慢,则会拖累整个下载速度。

        Future主要功能在于获取任务执行结果和对异步任务的控制。但如果要获取批量任务的执行结果,从上面的例子我们已经可以看到,单使用 Future 是很不方便的。其主要原因在于:一方面是没有好的方法去判断第一个完成的任务;另一方面是 Future的get方法 是阻塞的,使用不当会造成线程的浪费。第一个问题可以用 CompletionService 解决,CompletionService 提供了一个 take() 阻塞方法,用以依次获取所有已完成的任务。第二个问题可以用 Google Guava 库所提供的 ListeningExecutorService 和 ListenableFuture 来解决。除了获取批量任务执行结果时不便,Future另外一个不能做的事便是防止任务的重复提交。要做到这件事就需要 Future 最常见的一个实现类 FutureTask 了。Future只实现了异步,而没有实现回调,主线程get时会阻塞,可以轮询以便获取异步调用是否完成。

        在实际的使用中建议使用Guava ListenableFuture来实现异步非阻塞,目的就是多任务异步执行,通过回调的方方式来获取执行结果而不需轮询任务状态


参考链接:https://www.cnblogs.com/George1994/p/6702084.html

参考链接:https://blog.csdn.net/pistolove/article/details/51232004

参考链接:http://ifeve.com/google-guava-listenablefuture/

猜你喜欢

转载自blog.csdn.net/fly910905/article/details/80861296