1. 概述 因 Netty 中大量使用异步的调用方式,启动流程中的代码在不同的线程中执行,给分析其启动的顺序带来了一定的麻烦。这篇文章主要是对 ServerBootstrap 服务器启动流程做一个整体性的讲述,分析了每一个步骤所承担的工作,以及前后步骤的触发关系,即前一个步骤怎么调用后一个步骤,仍然以服务器启动的代码为例,分析其流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 EventLoopGroup bossGroup = new NioEventLoopGroup (1 );EventLoopGroup workerGroup = new NioEventLoopGroup ();final EchoServerHandler serverHandler = new EchoServerHandler ();try { ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100 ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null ) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
前后的文章已经分析过 EventLoopGroup 及 ChannelFutue,这里重点分析 bind 流程。
主要流程如下:
新建 NioServerSocketChannel 对象;
初始化 NioServerSocketChannel 对象,主要是向其添加 ChannelHandler 对象;
注册 NioServerSocketChannel 对象,主要是将该对象注册到 EventLoop 和 底层的 Selector 对象中;
执行 NioServerSocketChannel 对象的 bind 操作;
执行 NioServerSocketChannel read 操作,主要是向 Selector 对象注册 OP_ACCEPT 事件,完成之后便可接收网络请求。
2. 执行流程 2.1 新建 NioServerSocketChannel 对象 在 Netty 中,Nio Tcp Channel 有两种类型,一种是 NioServerSocketChannel,它代表了服务器 Channel,接收网络请求(OP_ACCEPT),另外一种是 NioSocketChannel,它代表了一次网络连接,读取网络数据。在 ServerBootstrap 中使用是 NioServerSocketChannel 对象,该对象通过反射方式生成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) public B channel (Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory <C>( ObjectUtil.checkNotNull(channelClass, "channelClass" ) )); } public class ReflectiveChannelFactory <T extends Channel > implements ChannelFactory <T> { private final Constructor<? extends T > constructor; public ReflectiveChannelFactory (Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz" ); try { this .constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { ... } } @Override public T newChannel () { try { return constructor.newInstance(); } catch (Throwable t) { ... } } } final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { ... } ... return regFuture; }
新建 NioServerSocketChannel 对象实际是使用 ReflectiveChannelFactory 对象来生成。
2.2 初始化 NioServerSocketChannel 对象 初始化工作主要是设置 NioServerSocketChannel 对象中的 ChannelPipeline,该 ChannelPipeline 对象以责任链模式维护一个 ChannelHandler 链表,用于处理后续的网络连接。在这里主要是加入一个 ChannelInitializer handler 类,它是一个特殊的 ChannelHandler 类,用于完成 ChannelHandler 列表的添加。在这个例子中,它加入了两个 ChannelHandler 类,一个是 LoggingHandler 类,它的功能是打印日志,由 ServerBootstrap.handler 方法加入, 一个是 ServerBootstrapAcceptor 类,它主要用来对新加入的 SocketChannel 进行初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void init (Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
ServerBootstrapAcceptor 主要的功能是处理 NioServerSocketChannel 的 OP_ACCEPT,即接收一个新的网络请求(对应一个 SocketChannel 对象),对其设置相关的参数及 ChannelHandler列表,这些配置来来自于 ServerBootstrap 的启动配置参数,最后为该 SocketChannel 分配一个 EventLoop 对象,实现网络连接的负载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
另外说明一点,ChannelPipeline 的 addLast 操作需要在 NioServerSocketChannel 对象注册到 EventLoop 中之后再会执行,此时 NioServerSocketChannel 还未注册,所以执行 addLast 方法只是简单向 ChannelPipeline 中添加一个 PendingHandlerAddedTask,等注册操作完成之后再进行调用。
2.3 注册 NioServerSocketChannel 对象 注册的操作主要包括两个部分:
将 NioServerSocketChannel 分配给 EventLoop,该操作是通过 BossGropu 来完成;
将 NioServerSocketChannel 注册到 Selector 对象上,用于接收网络请求。 每一个 NioServerSocketChannel 都包含一个 Java SelectableChannel 对象,网络请求最终都是通过这个对象来完成。
register 调用顺序为 AbstractBootstrap.initAndRegister –> MultithreadEventLoopGroup.register –> SingleThreadEventLoop –> AbstractUnsafe.register,注册操作最终是调用 AbstractUnsafe 类来完成的。这里先看下 initAndRegister 方法,register 与 后续的 bind 都是异步操作,而 bind 操作需要 register 操作成功之后再执行,这两个操作的协调就是在这一步完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { ... } ChannelFuture regFuture = config().group().register(channel); ... return regFuture; } private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise (channel); regFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
从上面的代码可以看出,register 与 bind 的操作是借助 ChannelFuture 完成的,主要是通过向 regFuture 中添加 ChannelFutureListener 监听器,在 register 完成之后调用 safeSetSuccess(regFuture) 触发调用监听器代码。
注册操作的核心代码在 AbstractUnsafe.register 方法中,再将 将 EventLoop 赋值给 AbstractChannel 对象,会向 EventLoop 提交一个异步任务用来执行 register0 方法,该方法包括了将 AbstractChannel 注册到 Selector 的 I/O 操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { } } }
register0 方法在 EventLoop 线程中执行,它主要包括下面几个步骤:
执行操作系统层面的注册操作,主要是调用 java api 来实现;
将 channel 状态设置为注册完成状态;
向 pipeline 添加 ChannelHandler,在这里调用 channel 初始化时添加到 pipeline 中的 PendingHandlerAddedTask;
将 regFuture 设置为成功完成状态,并触发调用 ChannelFutureListener 监听器,最终会调用 bind 操作;
触发 ChannelRegistered 事件,调用 ChannelHandler 中的回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void register0 (ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; ... pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); ... } catch (Throwable t) { ... } }
doRegister 方法调用的是 Java NIO api 完成网络层面的注册操作,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
doRegister 方法在 AbstractNioChannel 中实现,在这个方法中,主要是将 ServerSocketChannel 注册到底层的 selector 上,由它来监听 ServerSocketChannel 的 I/O 事件,不过在此时还没有注册感兴趣的 I/O 事件,是由后面的 read 操作来完成。
2.4 执行 bind 操作 bind 操作主要是完成底层 ServerSocketChannel 对象的地址绑定操作,其调用顺序为:AbstractBootstrap.doBind0 –> AbstractChannel.bind –> DefaultChannelPipeline.bind –> AbstractChannelHandlerContext.bind –> HeadContext.bind –> AbstractUnsafe.bind。最后调用 AbstractUnsafe 中的 bind 方法。
在 bind 方法中,主要做了下面的工作:
执行地址绑定操作,具体实现取决于 Channel 的子类;
判断 channel 是否 Active,正常情况,绑定成功之后便会激活 channel;
触发 channelActive 事件,执行后续的 read 操作;
设置成功完成 bind 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
doBind 方法的实现由 channel 的子类实现,在这里由 NioServerSocketChannel 来实现,如代码如下所示:
1 2 3 4 5 6 7 8 protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
doBind 执行完成之后,触发 channelActive 事件,在 channelActive 回调函数中再触发读事件,最后完成 OP_ACCEPT 事件的注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } }
在 channelActive 中触发调用 channel.read() 操作。
2.5 执行 read 操作 在 channelActive 中触发调用 read 的顺序为:AbstractChannel.read –> DefaultChannelPipeline.read –> AbstractChannelHandlerContext.read –> HeadContext.read –> AbstractUnsafe.beginRead,最终调用 AbstractUnsafe.beginRead 方法。
AbstractUnsafe.beginRead 中会调用 channel 的 doBeginRead 方法,该方法也是抽象文件,具体实现取决于子类,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public final void beginRead () { assertEventLoop(); if (!isActive()) { return ; } try { doBeginRead(); } catch (final Exception e) { ... } } protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } } public NioServerSocketChannel (ServerSocketChannel channel) { super (null , channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig (this , javaChannel().socket()); }
NioServerSocketChannel 类中,selectionKey 为 SelectionKey.OP_ACCEPT,即监听 ServerSocket 的 OP_ACCEPT 事件,注册完该事件之后,NioServerSocketChannel 便可接收网络请求了。
3. 总结 ServerBootstrap bind 方法是相对比较复杂的一个方法,它涉及到了 Netty 中各个组件,并将它们有机整合在一起。通过对 bind 方法的分析,对 Netty 的整体流程有了一个初步的理解,这会对使用 Netty 大有裨益。