文章目录
并发编程中,我们通常会用到一组非阻塞的模型:Future 和 Callback、Promise。其中的 Future 表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而Promise交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。 这一套模型是很多异步非阻塞架构的基础。
JDK中实现了Future和CallBack的模式,并没有Promise以及实现。Netty有类似“调用者和执行部件以异步的方式交互通信结果”的需求(要知道eventloop本质上是一个ScheduledExecutorService,ExecutorService是一种“提交-执行”模型实现,也存在线程间异步方式通信和线程安全问题),所以Netty实现了一套完整的Futre 和 Callback、Promise架构。
关于Future以及FutureTask的原理请见并发编程之Future源码解析
1. Future&Promise
也许你已经使用过JDK的Future对象,该接口的方法如下:
// 取消异步操作
boolean cancel(boolean mayInterruptIfRunning);
// 异步操作是否取消
boolean isCancelled();
// 异步操作是否完成,正常终止、异常、取消都是完成
boolean isDone();
// 阻塞直到取得异步操作结果
V get() throws InterruptedException, ExecutionException;
// 同上,但最长阻塞时间为timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
接口中只有isDone()方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK文档指出如果任务正在执行过程中返回false,正常终止、抛出异常、用户取消都会使isDone()方法返回true,其实这是JDK接口的约定。在我们的使用中,我们极有可能是对这三种情况分别处理,而JDK这样的设计不能满足我们的需求。
对于一个异步操作,我们更关心的是这个异步操作触发或者结束后能否再执行一系列动作。Netty扩展了JDK的Future接口,使其能解决上面的问题。扩展的方法如下:
// 异步操作完成且正常终止
boolean isSuccess();
// 异步操作是否可以取消
boolean isCancellable();
// 异步操作失败的原因
Throwable cause();
// 添加一个监听者,异步操作完成时回调,类比javascript的回调函数
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到异步操作完成
Future<V> await() throws InterruptedException;
// 同上,但异步操作失败时抛出异常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回异步结果,如果尚未完成返回null
V getNow();
Netty中future的特点:
- 操作状态分为success,fail,canceled三种。
- 并且通过addlisteners()方法可以添加回调操作,即异常、取消触发或者完成时需要进行的操作,类似于js中的callBack()。
- await()和sync(),可以以阻塞的方式等待异步完成;getnow()可以获得异步操作的结果,如果还未完成则返回Null。
future只有两种执行结果状态,unconpleted和conpleted,每种状态对应方法的返回值如下图
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
Future接口中的方法都是getter方法而没有setter方法,也就是说这样实现的Future子类的状态是不可变的,如果我们想要变化,那该怎么办呢?Netty提供的解决方法是:使用可写的Future即Promise。Promise接口扩展的方法如下:
// 标记异步操作结果为成功,如果已被设置(不管成功还是失败)则抛出异常IllegalStateException
Promise<V> setSuccess(V result);
// 同上,只是结果已被设置时返回False
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置结果为不可取消,结果已被取消返回False
boolean setUncancellable();
需要注意的是:Promise接口继承自Future接口,它提供的setter方法与常见的setter方法大为不同。Promise从Uncompleted–>Completed的状态转变有且只能有一次,也就是说setSuccess和setFailure方法最多只会成功一个,此外,在setSuccess和setFailure方法中会通知注册到其上的监听者。
至此,我们从总体上了解了Future和Promise的原理。我们再看一下类图:
2. AbstractFuture
AbstractFuture主要实现Future的get()方法,取得Future关联的异步操作结果,
protected EventExecutor executor() {
return executor;
}
@Override
public V get() throws InterruptedException, ExecutionException {
//阻塞直到异步任务完成
await();
Throwable cause = cause();
if (cause == null) {
//获得异步操作结果
return getNow();
}
//操作失败则抛出异常
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public boolean isDone() {
return true;
}
@Override
public Future<V> sync() throws InterruptedException {
return this;
}
3.Completefuture
completedfuture表示已经完成异步操作,该类在异步操作结束时创建,用户使用addlistener()方法提供异步操作方法。Completefuture持有执行任务的线程,用来执行listener中的任务。Completefuture表示已经完成异步操作,所以isdone()方法返回true;并且sync()方法和await()方法会立即返回。
private final EventExecutor executor;
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
protected EventExecutor executor() {
return executor;
}
@Override
public boolean isDone() {
return true;
}
@Override
public Future<V> sync() throws InterruptedException {
return this;
}
我们再看addListener()和removeListener()方法:
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
DefaultPromise.notifyListener(executor(), this, listener);
return this;
}
addListener内部调用DefaultPromise的静态方法,关于这部分内容在DefaultPromise中讲解,Completefuture是表示完成的异步结果,大致是将listener添加到任务队列,交给EventExecutor去执行。
Completefuture类总结:
- Conpletefuture中保存了eventexecutor的信息,用来执行listener中的任务。
- 调用了future的addlistener()方法后,将listener中的操作封装成runnble任务扔到eventexecutor中的任务队列中等待执行。
- Completefuture表示已经完成异步操作,状态是isdone。
- GenericFutureListener 继承了JDK提供的空接口EventListener,内部只有一个方法operationComplete。
4.Channelfuture&Completechannelfuture
Channelfuture继承future的接口,顾名思义,该接口与通道操作有关,所以在channelfuture接口中,除了覆盖future的功能外,只提供了一个channel()抽象方法。
Channel channel();
Completechannelfuture类其实都可以猜出来,实现Channelfuture接口,继承Completefuture类。
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
尖括号中的泛型表示Future关联的结果,此结果为Void,意味着CompleteChannelFuture不关心这个特定结果即get()相关方法返回null。也就是说,我们可以将CompleteChannelFuture纯粹的视为一种回调函数机制。
下面是它的成员变量和构造函数:
private final Channel channel;
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
super(executor);
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
CompleteChannelFuture的大部分方法实现中,只是将方法返回的Future覆盖为ChannelFuture对象(ChannelFuture接口的要求),executor()方法:
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
如果父类中eventexecutor不为空,则返回父类中的eventexecutor,否则返回channel中保存的eventexecutor。
5.Succeededchannelfuture/FailedChannelFuture
Succeeded/FailedChannelFuture为特定的两个异步操作结果,回忆Future状态的讲解:
Succeeded: isSuccess() = true, cause() = null;
Failed: isSuccess() = false, cause() = non-null
代码也是写死的:
#FailedChannelFuture
@Override
public Throwable cause() {
return cause;
}
#Succeededchannelfuture
@Override
public boolean isSuccess() {
return true;
}
6.DefaultPromise
DefaultPromise继承了AbstractFuture,同时实现了Promise接口。看其中的static字段:
// 可以嵌套的Listener的最大层数,可见最大值为8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");;
// 此处的Signal是Netty定义的类,继承自Error,异步操作成功且结果为null时设置为改值
private static final Signal SUCCESS = new Object();
private static final Signal UNCANCELLABLE = new Object();
// 异步操作失败时保存异常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(...);
嵌套的Listener,是指在listener的operationComplete方法中,可以再次使用future.addListener()继续添加listener,Netty限制的最大层数是8,用户可使用系统变量io.netty.defaultPromise.maxListenerStackDepth设置。
再看其中的私有字段:
// 异步操作结果
private volatile Object result;
// 执行listener操作的执行器
private final EventExecutor executor;
// 监听者
private Object listeners;
// 阻塞等待该结果的线程数
private short waiters;
// 通知正在进行标识
private boolean notifyingListeners;
listeners是一个Object类型。这似乎不合常理,一般情况下我们会使用一个集合或者一个数组。Netty之所以这样设计,是因为大多数情况下listener只有一个,用集合和数组都会造成浪费。当只有一个listener时,该字段为一个GenericFutureListener对象;当多余一个listener时,该字段为DefaultFutureListeners,可以储存多个listener。内部是GenericFutureListener数组,我们分析关键方法addListener():
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener); // 保证多线程情况下只有一个线程执行添加操作
}
if (isDone()) {
notifyListeners(); // 异步操作已经完成通知监听者
}
return this;
}
从代码中可以看出,在添加Listener时,如果异步操作已经完成,则会notifyListeners():
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener; // 只有一个
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener); // 大于两个
} else {
// 从一个扩展为两个
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
在添加Listener时,如果异步操作已经完成,则会notifyListeners():
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) { //执行线程为指定线程
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth(); // 嵌套层数
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 执行前增加嵌套层数
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 执行完毕,无论如何都要回滚嵌套层数
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 外部线程则提交任务给执行线程
safeExecute(executor, () -> { notifyListenersNow(); });
}
如果是外部线程,则只能提交任务到指定Executor,其中的操作最终由指定Executor执行。不管哪个线程都会执行notifyListenersNow()方法:
private void notifyListenersNow() {
Object listeners;
// 此时外部线程可能会执行添加Listener操作,所以需要同步
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
// 正在通知或已没有监听者(外部线程删除)直接返回
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) { // 通知多个
notifyListeners0((DefaultFutureListeners) listeners);
} else { // 通知单个
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
// 执行完毕且外部线程没有再添加监听者
if (this.listeners == null) {
notifyingListeners = false;
return;
}
// 外部线程添加了监听者继续执行
listeners = this.listeners;
this.listeners = null;
}
}
}
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
在该方法中,首先将Listeners取出来,然后将其清空(每次触发完listeners都会将原来的listeners清空),然后执行listener中具体的操作,执行完操作,会再次检查是否又有listeners添加进来,确保无误后,从方法中退出。
分析完了Promise最重要的addListener()和notifyListener()方法,在源码中还有static的notifyListener()方法,这些方法是CompleteFuture使用的,对于CompleteFuture,添加监听者的操作判断任务是否完成,直接执行Listener中的方法即可。
那么,当我们添加完Listener后,什么时候会执行,当然除了addListener方法中isDone为true是一种特殊情况,正常情况下,不会为true,Netty中用通知的方式来对后续的listener中的操作,操作结果等进行控制。通知的前提包括success,fail,cancel三种状态。三个操作至多只能调用一个且同一个方法至多生效一次,再次调用会抛出异常(set)或返回失败(try)。这些设置方法原理相同,我们以setSuccess()为例分析:
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners(); // 可以设置结果说明异步操作已完成,故通知监听者,就是上面分析的方法
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 为空设置为Signal对象SUCCESS(Object)
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// 只有结果为null或者UNCANCELLABLE时才可设置且只可以设置一次
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters(); // 通知等待的线程
return true;
}
return false;
}
checkNotifyWaiters()方法唤醒调用await()和sync()方法等待该异步操作结果的线程,代码如下:
private synchronized void checkNotifyWaiters() {
// 确实有等待的线程才notifyAll
if (waiters > 0) {
notifyAll(); // JDK方法
}
}
有了唤醒操作,那么sync()和await()的实现是怎么样的呢?sync()n内部调用了await(),sync()方法是在await()方法的基础上添加了额外的功能,区别只是sync()调用,如果异步操作失败,则会抛出异常,进入await方法:
public Promise<V> await() throws InterruptedException {
// 异步操作已经完成,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 同步使修改waiters的线程只有一个
synchronized (this) {
while (!isDone()) { // 等待直到异步操作完成
incWaiters(); // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}
checkDeadLock()方法用来进行死锁检测:
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
不能在同一个线程中调用await()相关的方法。一般执行任务的是EventExecutor,同步方法checkNotifyWaiters中调用了notifyAll释放锁,当调用await,则会一直卡在这里,造成了死锁。