1. 概述 Netty 中所有的的 I/O 操作都是异步的。I/O 操作是比较耗时的,为了不阻塞调用线程,Netty 提供了 ChannelFuture 接口,使用 addListener()方法注册一个 ChannelFutureListener 监听器,可以在 I/O 操作结束之后进行通知返回结果。在下面的代码中,bind 操作返回一个 ChannelFuture 对象,可以继续执行后续操作,也可以调用 sync() 方法同步等待执行结果,给程序开发带来了更多的开发模式,结合不同的业务场景,可以方便选择异步还是同步模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 EventLoopGroup bossGroup = new NioEventLoopGroup (1 );EventLoopGroup workerGroup = new NioEventLoopGroup ();final EchoServerHandler serverHandler = new EchoServerHandler ();try { ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100 ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null ) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
这篇文章的主要目的是分析 ChannelFuture 在 Netty 中的实现原理。
2. 原理 ChannelFutrue 本质上是线程间交换数据的方式,一个线程等待另外一个线程的处理结果,取得结果一般有两种方式:1)同步等待,如同 get() 方法;2)注册回调,在设置结果的同时调用回调函数。其伪代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class ChannelFutrue { private final CountDownLatch countDownLatch = new CountDownLatch (1 ); private List<GenericFutureListener> listeners = new ArrayList <>(); private volatile Object result; public void setSuccess (Object result) { this .result = result; countDownLatch.countDown(); listeners.stream().forEach(listener -> { try { listener.operationComplete(result); } catch (Exception ex) { ex.printStackTrace(); } }); } public void addListener (GenericFutureListener listener) { listeners.add(listener); } public Object get () throws InterruptedException { countDownLatch.await(); return result; } public Object bind () throws InterruptedException { return get(); } public interface GenericFutureListener { void operationComplete (Object result) throws Exception; } }
通过持有 ChannelFutrue 类,调用方可以同步或异步获取执行的结果,在这个例子中,为了简化操作,我们使用 CountDownLatch 进行同步,而在 ChannelFutrue 使用 synchronized + notify/await 来实现线程的同步。
3. Netty 实现 我们以 ServerBootstrap 中的 bind 方法为例,分析 ChannelFuture 在这个流程中的使用方式,bind 方法的主要流程如下所示(其中的实现细节在后续篇章介绍):
在 bind 方法中主要包含在 4 个步骤:
生成 NioServerSocketChannel 对象;
将 NioServerSocketChannel 对象注册到 EventLoop 中;
执行 bind 操作;
同步等待 bind 操作执行完成。
3.1 register 流程 可以看到第 2 和 3 步都是一个 I/O 操作,为了避免调用线程被阻塞,它们都被提交到 EventLoop 线程(每一个 EventLoop 对象都会绑定一个线程)中执行,并返回一个 ChannelFuture 对象,一个 I/O 操作会对应一个ChannelFuture 对象,调用线程与 EventLoop 通过该对象完成执行结果的交换。下面以 register 方法为例,分析下 ChannelFuture 对象的使用。
1、生成 ChannelFuture 对象
调用 register 之后返回一个 DefaultChannelPromise 对象,该对象是 ChannelFuture 的子类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public ChannelFuture register (Channel channel) { return register(new DefaultChannelPromise (channel, this )); } @Override public ChannelFuture register (final Channel channel, final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); ObjectUtil.checkNotNull(channel, "channel" ); channel.unsafe().register(this , promise); return promise; }
2、提交异步注册任务
提交注册任务的逻辑在 AbstractChannel.AbstractUnsafe 中,提交的时候会判断当前线程,如果当前线程是 eventLoop 线程,直接执行即可,如果不是,则提交一个任务到 eventLoop 线程 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop" ); if (isRegistered()) { promise.setFailure(new IllegalStateException ("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException ("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
3、执行注册逻辑
register0 是在 EventLoop 线程中执行的,与调用注册方法的线程不是同一个。注册的逻辑通过子类的 doRegister() 方法实现,注册完成之后通过 safeSetSuccess(promise) 和 safeSetFailure(promise, t) 通知注册结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
4、结果通知
结果通知主要包含两个操作:
设置处理结果,唤醒所有等待的线程;
调用注册到 ChannelFuture 中的监听器;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 protected final void safeSetSuccess (ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}" , promise); } } private boolean setValue0 (Object objResult) { if (RESULT_UPDATER.compareAndSet(this , null , objResult) || RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true ; } return false ; } private synchronized boolean checkNotifyWaiters () { if (waiters > 0 ) { notifyAll(); } return listeners != null ; } private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable () { @Override public void run () { notifyListenersNow(); } }); } private static void notifyListener0 (Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()" , t); } } }
3.2 异步操作的协同 在上面的操作中,bind 操作依赖 register 操作的结果,由于这两个操作都是异步操作,如何进行协同?即在 register 操作成功执行 bind 操作。正常情况下,有两种办法:1)同步等待操作执行完成;2)通过添加 GenericFutureListener 监听器,执行完由 EventLoop 线程进行回调。在这里是通过第二种方式来操作的。 在执行 initAndRegister 操作之后,会得到一个 ChannelFuture regFuture 对象,此时 register 已经提交给 EventLoop 执行,不一定执行完成,需要判断执行结果,如果未完成,则向 regFuture 对象中添加监听器,在监听器中调用 bind 操作,而监听器会中注册完成之后调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise (channel); regFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable () { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
3.3 sync 同步操作 由于 bind 操作是一个异步操作,此时在调用线程中需要等待绑定的结果,所以调用了 sync 方法。另外,在程序的最后,也使用了一个 ChannelFuture,用于等待 Channel 关闭事件。
1 2 3 4 5 6 ChannelFuture f = b.bind(PORT).sync();f.channel().closeFuture().sync();
3.3 ChannelFuture 线程同步 ChannelFuture 中的线程同步方式是 synchronized 同步块,如下代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public Promise<V> await () throws InterruptedException { if (isDone()) { return this ; } if (Thread.interrupted()) { throw new InterruptedException (toString()); } checkDeadLock(); synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this ; } private synchronized boolean checkNotifyWaiters () { if (waiters > 0 ) { notifyAll(); } return listeners != null ; }
在调用 await 操作时,如果没有结果(操作未完成),则会调用 wait 方法阻塞该线程,同时增加等待的线程数;操作完成之后会调用 notifyAll 方法,通知所有等待的线程继续执行,这样完成了调用结果在不同线程间的交互。
4. 总结 ChannelFuture 本质是线程间通信的一种工具,通过 ChannelFuture,可以实现 I/O 的异步操作,并完成操作结果的通知功能。