Netty5源码分析(七) -- 异步执行Future和Promise
java.util.concurrent.Future是Java提供的接口,表示异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成。
// Java FutureTask.get() public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
Netty扩展了Java的Future,最主要的改进就是增加了监听器Listener接口,通过监听器可以让异步执行更加有效率,不需要通过get来等待异步执行结束,而是通过监听器回调来精确地控制异步执行结束的时间点。
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); boolean isCancellable(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); V getNow(); boolean cancel(boolean mayInterruptIfRunning); }
ChannelFuture接口扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时关联了Channel,跟一个Channel绑定
public interface ChannelFuture extends Future<Void> { Channel channel(); @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); }
Promise接口也扩展了Future接口,它表示一种可写的Future,就是可以设置异步执行的结果
public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V result); boolean trySuccess(V result); Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); }
ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,又可写异步执行结构,又具备了监听者的功能,是Netty实际编程使用的表示异步执行的接口
public interface ChannelPromise extends ChannelFuture, Promise<Void> { @Override Channel channel(); @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause); @Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); }
DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promoise实例。Channel接口提供了newPromise接口,表示Channel要创建一个异步执行的动作
public interface Channel extends AttributeMap, Comparable<Channel> { ChannelPromise newPromise(); } public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { public ChannelPromise newPromise() { return new DefaultChannelPromise(this); } }
Netty推荐使用addListener的方式来回调异步执行的结果,这种方式优于Future.get,能够更精确地把握异步执行结束的时间。
看一下DefaultPromise的addListener方法,它判断异步任务执行的状态,如果执行完成,就理解通知监听者,否则加入到监听者队列
通知监听者就是找一个线程来执行调用监听的回调函数。
// DefaultPromise.addListener public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (isDone()) { notifyListener(executor(), this, listener); return this; } synchronized (this) { if (!isDone()) { if (listeners == null) { listeners = listener; } else { if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { @SuppressWarnings("unchecked") final GenericFutureListener<? extends Future<V>> firstListener = (GenericFutureListener<? extends Future<V>>) listeners; listeners = new DefaultFutureListeners(firstListener, listener); } } return this; } } notifyListener(executor(), this, listener); return this; } protected static void notifyListener( final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) { if (eventExecutor.inEventLoop()) { final Integer stackDepth = LISTENER_STACK_DEPTH.get(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { LISTENER_STACK_DEPTH.set(stackDepth + 1); try { notifyListener0(future, l); } finally { LISTENER_STACK_DEPTH.set(stackDepth); } return; } } try { eventExecutor.execute(new Runnable() { @Override public void run() { notifyListener0(future, l); } }); } catch (Throwable t) { logger.error("Failed to notify a listener. Event loop shut down?", t); } } @SuppressWarnings({ "unchecked", "rawtypes" }) static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } }
再来看监听者的接口,就一个方法,即等异步任务执行完成后,拿到Future结果,执行回调的逻辑
public interface GenericFutureListener<F extends Future<?>> extends EventListener { void operationComplete(F future) throws Exception; }
来看一个实例,在服务器绑定的过程中,会调用initAndRegister()来创建并注册Channel。
1. 这个过程中先通过Channel.newPromise创建了一个异步任务,然后把Promise实例传递给register方法。
2. register方法找一个线程来执行register0方法,第一步的方法在把任务交给线程后就返回了。这是一个异步执行的过程
3. 在新起的线程中,设置Promise的状态。可以看到Promise表示了可写的Future
4. 原线程在拿到initAndRegister返回的结果后,继续执行,这时候是由两个线程在执行
5. 原线程判断Promise的状态来判断是否注册完成,如果注册完成就执行后续的doBind0,如果没有完成,就通过添加回调的方法来进行异步执行
final ChannelFuture initAndRegister() { Channel channel; try { channel = createChannel(); } catch (Throwable t) { return VoidChannel.INSTANCE.newFailedFuture(t); } try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regFuture = channel.newPromise(); channel.unsafe().register(regFuture); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } because register(), bind(), and connect() are all bound to the same thread. return regFuture; } <pre name="code" class="java"> public final void register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { register0(promise); } else { try { <strong> eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });</strong> } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister(); registered = true; promise.setSuccess(); pipeline.fireChannelRegistered(); if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it‘s not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。