/** * 异步操作回调类,当某项操作异步执行时(由另外的线程执行),当前线程当即返回DefaultPromise * DefaultPromise.addListener 方法可以添加监听器,当操作完成时被触发 * 或者调用DefaultPromise.sync()阻塞当前线程,等待操作完成。 **/ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { private volatile Object result; //结果值 private final EventExecutor executor; /** * 监听器对象,可能是一个或多个监听器 */ private Object listeners; /** * 设置操作成功并通知监听器,如设置失败,抛出异常 * **/ public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } /** * 设置操作成功并通知监听器,如设置失败,则返回false * **/ public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; } /** * 设置操作失败并通知监听器,如设置失败,抛出异常 * */ public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this, cause); } /** * 设置操作失败并通知监听器,如设置失败,返回false */ public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; } /** * 操作是否成功 */ public boolean isSuccess() { Object result = this.result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); } /** * 操作是否可以取消 */ public boolean isCancellable() { return result == null; } /** * 查年操作的抛出的异常 */ public Throwable cause() { Object result = this.result; return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null; } //添加或移除监听器 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {} public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {} public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {} public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {} /** * 阻塞当前线程,等待异步任务操作完成 * **/ public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; } /** * 阻塞当前线程,等待异步任务操作完成,接受中断请求 * **/ public Promise<V> awaitUninterruptibly() { if (isDone()) { return this; } checkDeadLock(); boolean interrupted = false; synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } catch (InterruptedException e) { // Interrupted while waiting. interrupted = true; } finally { decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; } /** * 阻塞当前线程,等待异步任务操作完成,如果在等待时间花费完成之后,异步任务还未完成则不在等待(退出阻塞) * **/ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } /** * 阻塞当前线程,等待异步任务操作完成,如果在等待时间花费完成之后,异步任务还未完成则不在等待(退出阻塞) * **/ public boolean await(long timeoutMillis) throws InterruptedException { return await0(MILLISECONDS.toNanos(timeoutMillis), true); } /** * 获取异步操作的结果值 **/ public V getNow() { Object result = this.result; if (result instanceof CauseHolder || result == SUCCESS) { return null; } return (V) result; } /** * 任务是否被取消 **/ public boolean isCancelled() { return isCancelled0(result); } /** * 异步任务是否执行完成 **/ public boolean isDone() { return isDone0(result); } /** * 阻塞直至异步任务完成,或抛出异步任务出错的异常 */ public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); return this; } /** * 阻塞直至异步任务完成,或抛出异步任务出错的异常,支持中断 */ public Promise<V> syncUninterruptibly() { awaitUninterruptibly(); rethrowIfFailed(); return this; } /** * Notify a listener that a future has completed. * <p> * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded. * @param eventExecutor the executor to use to notify the listener {@code listener}. * @param future the future that is complete. * @param listener the listener to notify. */ protected static void notifyListener( EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) { notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener); } /** * 通知监听器 **/ private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); } //ignore some code }
Netty4.1源码 :DefaultPromise
猜你喜欢
转载自java12345678.iteye.com/blog/2356093
今日推荐
周排行