博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty中的ChannelFuture和ChannelPromise
阅读量:5332 次
发布时间:2019-06-14

本文共 26144 字,大约阅读时间需要 87 分钟。

在Netty使用ChannelFuture和ChannelPromise进行异步操作的处理

这是官方给出的ChannelFutur描述

1  *                                      | Completed successfully    | 2  *                                      +---------------------------+ 3  *                                 +---->      isDone() = true      | 4  * +--------------------------+    |    |   isSuccess() = true      | 5  * |        Uncompleted       |    |    +===========================+ 6  * +--------------------------+    |    | Completed with failure    | 7  * |      isDone() = false    |    |    +---------------------------+ 8  * |   isSuccess() = false    |----+---->      isDone() = true      | 9  * | isCancelled() = false    |    |    |       cause() = non-null  |10  * |       cause() = null     |    |    +===========================+11  * +--------------------------+    |    | Completed by cancellation |12  *                                 |    +---------------------------+13  *                                 +---->      isDone() = true      |14  *                                      | isCancelled() = true      |15  *                                      +---------------------------+

由图可以知道ChannelFutur有四种状态:Uncompleted、Completed successfully、Completed with failure、Completed by cancellation,这几种状态是由isDone、isSuccess、isCancelled、cause这四种方法的返回值决定的。

 

ChannelFutur接口的定义如下:

1 public interface ChannelFuture extends Future
{ 2 Channel channel(); 3 4 ChannelFuture addListener(GenericFutureListener
> var1); 5 6 ChannelFuture addListeners(GenericFutureListener... var1); 7 8 ChannelFuture removeListener(GenericFutureListener
> var1); 9 10 ChannelFuture removeListeners(GenericFutureListener... var1);11 12 ChannelFuture sync() throws InterruptedException;13 14 ChannelFuture syncUninterruptibly();15 16 ChannelFuture await() throws InterruptedException;17 18 ChannelFuture awaitUninterruptibly();19 20 boolean isVoid();21 }

继承自Netty的Future:

1 public interface Future
extends java.util.concurrent.Future
{ 2 boolean isSuccess(); 3 4 boolean isCancellable(); 5 6 Throwable cause(); 7 8 Future
addListener(GenericFutureListener
> var1); 9 10 Future
addListeners(GenericFutureListener... var1);11 12 Future
removeListener(GenericFutureListener
> var1);13 14 Future
removeListeners(GenericFutureListener... var1);15 16 Future
sync() throws InterruptedException;17 18 Future
syncUninterruptibly();19 20 Future
await() throws InterruptedException;21 22 Future
awaitUninterruptibly();23 24 boolean await(long var1, TimeUnit var3) throws InterruptedException;25 26 boolean await(long var1) throws InterruptedException;27 28 boolean awaitUninterruptibly(long var1, TimeUnit var3);29 30 boolean awaitUninterruptibly(long var1);31 32 V getNow();33 34 boolean cancel(boolean var1);35 }

 

Netty的Future又继承自JDK的Future:

1 public interface Future
{ 2 3 boolean cancel(boolean mayInterruptIfRunning); 4 5 boolean isCancelled(); 6 7 boolean isDone(); 8 9 V get() throws InterruptedException, ExecutionException;10 11 V get(long timeout, TimeUnit unit)12 throws InterruptedException, ExecutionException, TimeoutException;13 }

ChannelPromise继承了ChannelFuture:

1 public interface ChannelPromise extends ChannelFuture, Promise
{ 2 Channel channel(); 3 4 ChannelPromise setSuccess(Void var1); 5 6 ChannelPromise setSuccess(); 7 8 boolean trySuccess(); 9 10 ChannelPromise setFailure(Throwable var1);11 12 ChannelPromise addListener(GenericFutureListener
> var1);13 14 ChannelPromise addListeners(GenericFutureListener... var1);15 16 ChannelPromise removeListener(GenericFutureListener
> var1);17 18 ChannelPromise removeListeners(GenericFutureListener... var1);19 20 ChannelPromise sync() throws InterruptedException;21 22 ChannelPromise syncUninterruptibly();23 24 ChannelPromise await() throws InterruptedException;25 26 ChannelPromise awaitUninterruptibly();27 28 ChannelPromise unvoid();29 }

其中Promise接口定义如下:

1 public interface Promise
extends Future
{ 2 Promise
setSuccess(V var1); 3 4 boolean trySuccess(V var1); 5 6 Promise
setFailure(Throwable var1); 7 8 boolean tryFailure(Throwable var1); 9 10 boolean setUncancellable();11 12 Promise
addListener(GenericFutureListener
> var1);13 14 Promise
addListeners(GenericFutureListener... var1);15 16 Promise
removeListener(GenericFutureListener
> var1);17 18 Promise
removeListeners(GenericFutureListener... var1);19 20 Promise
await() throws InterruptedException;21 22 Promise
awaitUninterruptibly();23 24 Promise
sync() throws InterruptedException;25 26 Promise
syncUninterruptibly();27 }

在Netty中,无论是服务端还是客户端,在Channel注册时都会为其绑定一个ChannelPromise,默认实现是DefaultChannelPromise

DefaultChannelPromise定义如下:

1 public class DefaultChannelPromise extends DefaultPromise
implements ChannelPromise, FlushCheckpoint { 2 3 private final Channel channel; 4 private long checkpoint; 5 6 public DefaultChannelPromise(Channel channel) { 7 this.channel = checkNotNull(channel, "channel"); 8 } 9 10 public DefaultChannelPromise(Channel channel, EventExecutor executor) { 11 super(executor); 12 this.channel = checkNotNull(channel, "channel"); 13 } 14 15 @Override 16 protected EventExecutor executor() { 17 EventExecutor e = super.executor(); 18 if (e == null) { 19 return channel().eventLoop(); 20 } else { 21 return e; 22 } 23 } 24 25 @Override 26 public Channel channel() { 27 return channel; 28 } 29 30 @Override 31 public ChannelPromise setSuccess() { 32 return setSuccess(null); 33 } 34 35 @Override 36 public ChannelPromise setSuccess(Void result) { 37 super.setSuccess(result); 38 return this; 39 } 40 41 @Override 42 public boolean trySuccess() { 43 return trySuccess(null); 44 } 45 46 @Override 47 public ChannelPromise setFailure(Throwable cause) { 48 super.setFailure(cause); 49 return this; 50 } 51 52 @Override 53 public ChannelPromise addListener(GenericFutureListener
> listener) { 54 super.addListener(listener); 55 return this; 56 } 57 58 @Override 59 public ChannelPromise addListeners(GenericFutureListener
>... listeners) { 60 super.addListeners(listeners); 61 return this; 62 } 63 64 @Override 65 public ChannelPromise removeListener(GenericFutureListener
> listener) { 66 super.removeListener(listener); 67 return this; 68 } 69 70 @Override 71 public ChannelPromise removeListeners(GenericFutureListener
>... listeners) { 72 super.removeListeners(listeners); 73 return this; 74 } 75 76 @Override 77 public ChannelPromise sync() throws InterruptedException { 78 super.sync(); 79 return this; 80 } 81 82 @Override 83 public ChannelPromise syncUninterruptibly() { 84 super.syncUninterruptibly(); 85 return this; 86 } 87 88 @Override 89 public ChannelPromise await() throws InterruptedException { 90 super.await(); 91 return this; 92 } 93 94 @Override 95 public ChannelPromise awaitUninterruptibly() { 96 super.awaitUninterruptibly(); 97 return this; 98 } 99 100 @Override101 public long flushCheckpoint() {102 return checkpoint;103 }104 105 @Override106 public void flushCheckpoint(long checkpoint) {107 this.checkpoint = checkpoint;108 }109 110 @Override111 public ChannelPromise promise() {112 return this;113 }114 115 @Override116 protected void checkDeadLock() {117 if (channel().isRegistered()) {118 super.checkDeadLock();119 }120 }121 122 @Override123 public ChannelPromise unvoid() {124 return this;125 }126 127 @Override128 public boolean isVoid() {129 return false;130 }131 }

可以看到这个DefaultChannelPromise仅仅是将Channel封装了,而且其基本上所有方法的实现都依赖于父类DefaultPromise

DefaultPromise中的实现是整个ChannelFuture和ChannelPromise的核心所在:

DefaultPromise中有如下几个状态量:

1 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,2             SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));3 private static final Object SUCCESS = new Object();4 private static final Object UNCANCELLABLE = new Object();5 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(6             new CancellationException(), DefaultPromise.class, "cancel(...)"));7 private static final AtomicReferenceFieldUpdater
RESULT_UPDATER =8 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");

MAX_LISTENER_STACK_DEPTH: 表示最多可执行listeners的数量,默认是8

SUCCESS :表示异步操作正常完成
UNCANCELLABLE:表示异步操作不可取消,并且尚未完成
CANCELLATION_CAUSE_HOLDER:表示异步操作取消监听,用于cancel操作,
而CauseHolder 的实例对象是用来表示异步操作异常结束,同时保存异常信息:

1 private static final class CauseHolder {2     final Throwable cause;3     CauseHolder(Throwable cause) {4         this.cause = cause;5     }6 }

RESULT_UPDATER:是一个原子更新器,通过CAS操作,原子化更新 DefaultPromise对象的名为result的成员,这个result成员是其异步操作判断的关键所在

DefaultPromise的成员及构造方法定义:

1 public class DefaultPromise
extends AbstractFuture
implements Promise
{ 2 private volatile Object result; 3 private final EventExecutor executor; 4 private Object listeners; 5 private short waiters; 6 private boolean notifyingListeners; 7 8 public DefaultPromise(EventExecutor executor) { 9 this.executor = checkNotNull(executor, "executor");10 }11 }

result:就是前面说的,判断异步操作状态的关键

result的取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及null (其实还可以是泛型V类型的任意对象,这里暂不考虑)
executor:就是Channel绑定的NioEventLoop,在我之前的博客说过,Channel的异步操作都是在NioEventLoop的线程中完成的([Netty中NioEventLoopGroup的创建源码分析](https://blog.csdn.net/Z_ChenChen/article/details/90567863))
listeners:通过一个Object保存所有对异步操作的监听,用于异步操作的回调
waiters:记录阻塞中的listeners的数量
notifyingListeners:是否需要唤醒的标志

首先来看isDone方法,通过之前的图可以知道,

isDone为false对应了Uncompleted状态,即异步操作尚未完成;
isDone为true则代表了异步操作完成,但是还是有三种完成情况,需要结合别的判断方法才能具体知道是哪种情况;

isDone方法:

1 @Override2 public boolean isDone() {3     return isDone0(result);4 }

调用isDone0:

1 private static boolean isDone0(Object result) {2     return result != null && result != UNCANCELLABLE;3 }

有如下几种情况:

result等于null,result没有赋值,表示异步操作尚未完成(从这里就能想到异步操作完成,需要调用某个set方法来改变result的状态)
result是UNCANCELLABLE状态,表示执行中的异步操作不可取消,当然也就是异步操作尚未完成
result不等于null,且不等于UNCANCELLABLE,就表示异步操作完成(包括正常完成,以及异常结束,需要由cause方法进一步判断)

isSuccess方法:

1 @Override2 public boolean isSuccess() {3     Object result = this.result;4     return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);5 }

由这里可以知道当且仅当result 为SUCCESS状态时,才返回true(其余除UNCANCELLABLE和null的值其实也可以,这里暂不考虑)

isCancelled方法:

1 @Override2 public boolean isCancelled() {3     return isCancelled0(result);4 }

调用isCancelled0方法:

1 private static boolean isCancelled0(Object result) {2     return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;3 }

只有当result是CancellationException的实例时,表示取消异步操作

 

接着来看cause方法:

1 @Override2 public Throwable cause() {3     Object result = this.result;4     return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;5 }

和上面同理,通过判别resul是否是CauseHolder的实现类,若是,将CauseHolder保存的异常返回。

几种状态的判别说完了,下面看一下如何设置这几种状态的:

setSuccess方法:

1 @Override2 public Promise
setSuccess(V result) {3 if (setSuccess0(result)) {4 notifyListeners();5 return this;6 }7 throw new IllegalStateException("complete already: " + this);8 }

首先调用setSuccess0方法,其中result的泛型通过DefaultChannelPromise可知是Void,在DefaultChannelPromise中所有的set和try操作参数都是null,这里的result也就不去考虑:

1 private boolean setSuccess0(V result) {2     return setValue0(result == null ? SUCCESS : result);3 }

继续调用setValue0方法:

1 private boolean setValue0(Object objResult) {2     if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||3         RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {4         checkNotifyWaiters();5         return true;6     }7     return false;8 }

通过CAS操作,将result状态变为SUCCESS

其中checkNotifyWaiters方法:

1 private synchronized void checkNotifyWaiters() {2     if (waiters > 0) {3         notifyAll();4     }5 }

检查waiters的个数,唤醒所有阻塞中的this,sync方法会引起阻塞

 

回到setSuccess方法中,setSuccess0通过CAS操作,将result状态更新为SUCCESS成功后,调用

notifyListeners方法,唤醒所有listener完成对异步操作的回调

listeners是通过addListener方法添加的,用来对异步操作进行侦听:

看到addListener方法:

1 @Override 2 public Promise
addListener(GenericFutureListener
> listener) { 3 checkNotNull(listener, "listener"); 4 5 synchronized (this) { 6 addListener0(listener); 7 } 8 9 if (isDone()) {10 notifyListeners();11 }12 13 return this;14 }15 16 @Override17 public Promise
addListeners(GenericFutureListener
>... listeners) {18 checkNotNull(listeners, "listeners");19 20 synchronized (this) {21 for (GenericFutureListener
> listener : listeners) {22 if (listener == null) {23 break;24 }25 addListener0(listener);26 }27 }28 29 if (isDone()) {30 notifyListeners();31 }32 33 return this;34 }

其中GenericFutureListener接口定义如下:

1 public interface GenericFutureListener
> extends EventListener {2 /**3 * Invoked when the operation associated with the {
@link Future} has been completed.4 *5 * @param future the source {
@link Future} which called this callback6 */7 void operationComplete(F future) throws Exception;8 }

可以看到listener其实就是通过operationComplete方法,来完成回调,对Future对象进行处理,由注释可知operationComplete方法是在future操作完成时调用

addListeners方法的实现比较简单,实现核心是在addListener0中:

1 private void addListener0(GenericFutureListener
> listener) {2 if (listeners == null) {3 listeners = listener;4 } else if (listeners instanceof DefaultFutureListeners) {5 ((DefaultFutureListeners) listeners).add(listener);6 } else {7 listeners = new DefaultFutureListeners((GenericFutureListener
) listeners, listener);8 }9 }

其中DefaultFutureListeners是将GenericFutureListener对象封装的一个数组:

1 final class DefaultFutureListeners { 2  3     private GenericFutureListener
>[] listeners; 4 private int size; 5 private int progressiveSize; 6 7 @SuppressWarnings("unchecked") 8 DefaultFutureListeners( 9 GenericFutureListener
> first, GenericFutureListener
> second) {10 listeners = new GenericFutureListener[2];11 listeners[0] = first;12 listeners[1] = second;13 size = 2;14 if (first instanceof GenericProgressiveFutureListener) {15 progressiveSize ++;16 }17 if (second instanceof GenericProgressiveFutureListener) {18 progressiveSize ++;19 }20 }21 22 public void add(GenericFutureListener
> l) {23 GenericFutureListener
>[] listeners = this.listeners;24 final int size = this.size;25 if (size == listeners.length) {26 this.listeners = listeners = Arrays.copyOf(listeners, size << 1);27 }28 listeners[size] = l;29 this.size = size + 1;30 31 if (l instanceof GenericProgressiveFutureListener) {32 progressiveSize ++;33 }34 }35 36 public void remove(GenericFutureListener
> l) {37 final GenericFutureListener
>[] listeners = this.listeners;38 int size = this.size;39 for (int i = 0; i < size; i ++) {40 if (listeners[i] == l) {41 int listenersToMove = size - i - 1;42 if (listenersToMove > 0) {43 System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);44 }45 listeners[-- size] = null;46 this.size = size;47 48 if (l instanceof GenericProgressiveFutureListener) {49 progressiveSize --;50 }51 return;52 }53 }54 }55 56 public GenericFutureListener
>[] listeners() {57 return listeners;58 }59 60 public int size() {61 return size;62 }63 64 public int progressiveSize() {65 return progressiveSize;66 }67 }

size:记录listeners的个数

progressiveSize:记录GenericProgressiveFutureListener类型的listeners的个数
DefaultFutureListeners 中对数组的操作比较简单,
add方法,当size达到数组长度时,进行二倍扩容,

其中GenericProgressiveFutureListener继承自GenericFutureListener:

1 public interface GenericProgressiveFutureListener
> extends GenericFutureListener
{ 2 /** 3 * Invoked when the operation has progressed. 4 * 5 * @param progress the progress of the operation so far (cumulative) 6 * @param total the number that signifies the end of the operation when {
@code progress} reaches at it. 7 * {
@code -1} if the end of operation is unknown. 8 */ 9 void operationProgressed(F future, long progress, long total) throws Exception;10 }

由注释可知operationProgressed是在future操作进行时调用,这里不对GenericProgressiveFutureListener过多讨论

回到addListener0方法,由DefaultFutureListeners就可以知道,实际上通过DefaultFutureListeners管理的一维数组来保存listeners

在addListener方法完成对listener的添加后,还会调用isDone方法检查当前异步操作是否完成,若是完成需要调用notifyListeners,直接唤醒所有listeners完后对异步操作的回调

有add就有remove,removeListener方法:

1 @Override 2 public Promise
removeListener(final GenericFutureListener
> listener) { 3 checkNotNull(listener, "listener"); 4 5 synchronized (this) { 6 removeListener0(listener); 7 } 8 9 return this;10 }11 12 @Override13 public Promise
removeListeners(final GenericFutureListener
>... listeners) {14 checkNotNull(listeners, "listeners");15 16 synchronized (this) {17 for (GenericFutureListener
> listener : listeners) {18 if (listener == null) {19 break;20 }21 removeListener0(listener);22 }23 }24 25 return this;26 }

还是由removeListener0来实现:

1 private void removeListener0(GenericFutureListener
> listener) {2 if (listeners instanceof DefaultFutureListeners) {3 ((DefaultFutureListeners) listeners).remove(listener);4 } else if (listeners == listener) {5 listeners = null;6 }7 }

看过之前的内容在看这里就比较简单了,通过DefaultFutureListeners去删除

notifyListeners方法:

1 private void notifyListeners() { 2     EventExecutor executor = executor(); 3     if (executor.inEventLoop()) { 4         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); 5         final int stackDepth = threadLocals.futureListenerStackDepth(); 6         if (stackDepth < MAX_LISTENER_STACK_DEPTH) { 7             threadLocals.setFutureListenerStackDepth(stackDepth + 1); 8             try { 9                 notifyListenersNow();10             } finally {11                 threadLocals.setFutureListenerStackDepth(stackDepth);12             }13             return;14         }15     }16 17     safeExecute(executor, new Runnable() {18         @Override19         public void run() {20             notifyListenersNow();21         }22     });23 }

其中executor方法:

1 protected EventExecutor executor() {2     return executor;3 }

用来获取executor轮询线程对象

判断executor是否处于轮询,否则需要通过safeExecute方法处理listeners的侦听,

safeExecute方法:

1 private static void safeExecute(EventExecutor executor, Runnable task) {2     try {3         executor.execute(task);4     } catch (Throwable t) {5         rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);6     }7 }

这里保证了listeners的侦听回调是异步执行

InternalThreadLocalMap在我之前的博客中说过,是Netty使用的ThreadLocal ()

去线程本地变量中找futureListenerStackDepth(默认为0),判断stackDepth是否小于MAX_LISTENER_STACK_DEPTH,否则也要通过safeExecute方法处理listeners的侦听

核心都是调用notifyListenersNow方法:

1 private void notifyListenersNow() { 2     Object listeners; 3     synchronized (this) { 4         // Only proceed if there are listeners to notify and we are not already notifying listeners. 5         if (notifyingListeners || this.listeners == null) { 6             return; 7         } 8         notifyingListeners = true; 9         listeners = this.listeners;10         this.listeners = null;11     }12     for (;;) {13         if (listeners instanceof DefaultFutureListeners) {14             notifyListeners0((DefaultFutureListeners) listeners);15         } else {16             notifyListener0(this, (GenericFutureListener
) listeners);17 }18 synchronized (this) {19 if (this.listeners == null) {20 // Nothing can throw from within this method, so setting notifyingListeners back to false does not21 // need to be in a finally block.22 notifyingListeners = false;23 return;24 }25 listeners = this.listeners;26 this.listeners = null;27 }28 }29 }

先检查是否需要监听,满足条件后,判断listeners是否是DefaultFutureListeners,即包装后的数组

notifyListeners0方法:

1 private void notifyListeners0(DefaultFutureListeners listeners) {2    GenericFutureListener
[] a = listeners.listeners();3 int size = listeners.size();4 for (int i = 0; i < size; i ++) {5 notifyListener0(this, a[i]);6 }7 }

遍历这个数组,实则调用notifyListener0方法:

1 private static void notifyListener0(Future future, GenericFutureListener l) {2     try {3         l.operationComplete(future);4     } catch (Throwable t) {5         if (logger.isWarnEnabled()) {6             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);7         }8     }9 }

这里就可以看到,完成了对operationComplete的回调,处理future

 

setSuccess结束,再来看trySuccess方法:

1 @Override2 public boolean trySuccess(V result) {3     if (setSuccess0(result)) {4         notifyListeners();5         return true;6     }7     return false;8 }

对比setSuccess来看,只有返回值不一样

setFailure方法:

1 @Override 2 public Promise
setFailure(Throwable cause) { 3 if (setFailure0(cause)) { 4 notifyListeners(); 5 return this; 6 } 7 throw new IllegalStateException("complete already: " + this, cause); 8 } 9 10 private boolean setFailure0(Throwable cause) {11 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));12 }13 14 private boolean setValue0(Object objResult) {15 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||16 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {17 checkNotifyWaiters();18 return true;19 }20 return false;21 }

和setSuccess逻辑一样,只不过CAS操作将状态变为了CauseHolder对象,成功后唤醒listeners对异步操作的回调

tryFailure方法:

1 @Override2 public boolean tryFailure(Throwable cause) {3     if (setFailure0(cause)) {4         notifyListeners();5         return true;6     }7     return false;8 }

也都是一个逻辑

还有一个setUncancellable方法:

1 @Override2 public boolean setUncancellable() {3     if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {4         return true;5     }6     Object result = this.result;7     return !isDone0(result) || !isCancelled0(result);8 }

若是result状态为null,异步操作尚未结束,直接通过CAS操作将状态变为UNCANCELLABLE

否则若是根据状态来判断

下来看到cancel方法:

1 /** 2  * {
@inheritDoc} 3 * 4 * @param mayInterruptIfRunning this value has no effect in this implementation. 5 */ 6 @Override 7 public boolean cancel(boolean mayInterruptIfRunning) { 8 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { 9 checkNotifyWaiters();10 notifyListeners();11 return true;12 }13 return false;14 }

mayInterruptIfRunning正如注释中所说,在这里没有什么作用

还是通过CAS操作,将状态变为CANCELLATION_CAUSE_HOLDER,调用checkNotifyWaiters唤醒因sync阻塞的线程,notifyListeners方法回调listeners的侦听

最后看到sync方法:

1 @Override2 public Promise
sync() throws InterruptedException {3 await();4 rethrowIfFailed();5 return this;6 }

先调用await方法:

1 @Override 2 public Promise
await() throws InterruptedException { 3 if (isDone()) { 4 return this; 5 } 6 7 if (Thread.interrupted()) { 8 throw new InterruptedException(toString()); 9 }10 11 checkDeadLock();12 13 synchronized (this) {14 while (!isDone()) {15 incWaiters();16 try {17 wait();18 } finally {19 decWaiters();20 }21 }22 }23 return this;24 }

先判断能否执行(异步操作尚未结束,当前线程没有被中断),然后调用checkDeadLock方法:

1 protected void checkDeadLock() {2     EventExecutor e = executor();3     if (e != null && e.inEventLoop()) {4         throw new BlockingOperationException(toString());5     }6 }

检查轮询线程是否在工作

在synchronized块中以自身为锁,自旋等待异步操作的完成,若是没完成,调用incWaiters方法:

1 private void incWaiters() {2     if (waiters == Short.MAX_VALUE) {3         throw new IllegalStateException("too many waiters: " + this);4     }5     ++waiters;6 }

在小于Short.MAX_VALUE的情况下,对waiters自增,

然后使用wait将自身阻塞,等待被唤醒
所以在之前setValue0时,checkNotifyWaiters操作会notifyAll,
由此可以知道sync方法的作用:在某一线程中调用sync方法会使得当前线程被阻塞,只有当异步操作执完毕,通过上面的set方法改变状态后,才会调用checkNotifyWaiters方法唤醒当前线程。

当从阻塞中被唤醒后调用decWaiters方法:

1 private void decWaiters() {2     --waiters;3 }

使得waiters自减

通过这样一种自旋方式,一直等到isDone成立,结束自旋,进而结束await方法,然后调用rethrowIfFailed方法:

1 private void rethrowIfFailed() {2     Throwable cause = cause();3     if (cause == null) {4         return;5     }6 7     PlatformDependent.throwException(cause);8 }

根据异步操作是否有异常,进而使用PlatformDependent抛出异常。

至此Netty中的ChannelFuture和ChannelPromise分析到此全部结束。

转载于:https://www.cnblogs.com/a526583280/p/10965537.html

你可能感兴趣的文章
Angular CLI 使用教程指南参考
查看>>
nginx的配置文件详解
查看>>
MHA的介绍和测试(一)
查看>>
rsync配置和同步数据
查看>>
uva11630 or hdu2987 Cyclic antimonotonic permutations(构造水题)
查看>>
UIButton的resizableImageWithCapInsets使用解析
查看>>
[翻译] SCRecorder
查看>>
DDCTF2019 的四道题wp
查看>>
linux maven安装(三)
查看>>
Unity3D笔记十三 摄像机之间切换
查看>>
.eww
查看>>
MUI框架-01-介绍-创建项目-简单页面
查看>>
过滤数据
查看>>
Codeforces 992 范围内GCD,LCM要求找一对数 衣柜裙子期望
查看>>
ssh The authenticity of host '10.11.26.2 (10.11.26.2)' can't be established
查看>>
代码学习总结
查看>>
初入Installshield2015
查看>>
003 centos7中关闭防火墙
查看>>
仪表盘
查看>>
AutoCloseable的用法
查看>>