1. 概述
Reactor
模式是一种服务器网络编程模式,它根据网络数据接收的特点,将连接的建立、网络数据的读写分离,用
mainReactor 线程处理网络的连接,用 subReactor
处理数据的读写,同时为了有效利用 CPU 多核的优势,subActor
可以有多个。它的整体结构如下图所示:
特点:
客户端的所有连接请求统一由 mainReactor 线程处理,同时将收到请求转交
subReactor 处理;
subReactor 线程处理连接的读写,为了实现处理的负载,可以有多个
subReactor,通过一定的算法分配网络连接;
考虑到连接的 I/O
读写比较耗时,为了提高吞吐量,读写操作可以交由线程池处理。
说明: 文中说到的“网络连接”与下文说到的 "channel" 和
"socketChannel" 是一个概念。
另外,这篇文章主要包含三个部分的内容:1)Reactor
概念的介绍;2)Reactor 的模拟;3)Netty 中的实现;现在我们用 Java
模拟一个 Reactor 的实现。
2. 原理
2.1 MainReactor
我们以一个例子来模拟一个 Reactor,先看 MainReactor
类的代码,它主要的功能是监听 9090 端口接收网络连接,并将网络请求注册到
SubReactor 类,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public class MainReactor implements Runnable { private static final int PORT = 9090 ; private Selector selector; private ServerSocketChannel serverChannel; private SelectorManager manager; private boolean isStop; public MainReactor () throws Exception { init(); } private void init () throws Exception { isStop = false ; selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.socket().bind(new InetSocketAddress (PORT)); serverChannel.configureBlocking(false ); serverChannel.register(selector, SelectionKey.OP_ACCEPT); manager = new SelectorManager (); } public void doStart () { Thread seletorThread = new Thread (this ); seletorThread.start(); } private void process (SocketChannel channel) { manager.register(channel); } public void shutdown () { isStop = true ; } @Override public void run () { try { while (!isStop) { selector.select(); Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()) { System.out.println("accept thread:" + Thread.currentThread().getName()); SelectionKey key = iterator.next(); iterator.remove(); ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("receive a connection:" + client.socket().getRemoteSocketAddress()); process(client); } } } catch (Exception e) { e.printStackTrace(); } } public static void main (String[] args) throws Exception { MainReactor server = new MainReactor (); server.doStart(); } }
MainReactor 有几个主要的属性:
Selector:Selector 对象,用于实现网络 I/O
事件的监听,它只监听网络请求事件;
ServerSocketChannel:服务器套接字,用于接收网络请求;
SelectorManager:用于分配 SocketChannel 到
subReactor,SelectorManager 存有多个 subReactor 对象。
2.2 SubReactor
SubReactor 主要是处理 SocketChannel 的读写,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 public class SubReactor implements Runnable { private Selector selector; private List<SocketChannel> queue; private boolean isStop; private ThreadPoolManager pool; public SubReactor () { isStop = false ; pool = ThreadPoolManager.getInstance(); } public SubReactor (Selector sel) { isStop = false ; this .selector = sel; queue = new LinkedList <SocketChannel>(); pool = ThreadPoolManager.getInstance(); } public void start () { Thread thread = new Thread (this ); thread.start(); } public synchronized void register (SocketChannel channel) { queue.add(channel); } public void shutdown () { isStop = true ; } @Override public void run () { try { while (!isStop) { configuration(); int count = selector.select(); if (count == 0 ) { continue ; } Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()) { System.out.println("io thread:" + Thread.currentThread().getName()); SelectionKey key = iterator.next(); iterator.remove(); if (!key.isValid()) { continue ; } if (key.isReadable()) { System.out.println(Thread.currentThread().getName() + " read........" ); handleRead(key); } if (key.isValid() && key.isWritable()) { System.out.println(Thread.currentThread().getName() + " write........" ); handleWrite(key); } } } } catch (Exception e) { e.printStackTrace(); } } private void process (SelectionKey key) { HandlerContainer container = (HandlerContainer) key.attachment(); IoHandler handler = container.getHandler(); handler.setKey(key); pool.execute(handler); } public synchronized void configuration () throws Exception { int length = queue.size(); for (int i = 0 ; i < length; i++) { SocketChannel channel = queue.get(i); channel.configureBlocking(false ); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); IoHandler handler = new IoHandler (); HandlerContainer container = new HandlerContainer (handler); key.attach(container); } queue.clear(); } public void handleRead (SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer input = ByteBuffer.allocate(1024 ); HandlerContainer container = (HandlerContainer) key.attachment(); ByteBuffer buf = container.getBuf(); long bytesRead = clntChan.read(input); if (bytesRead == -1 ) { clntChan.close(); System.out.println("connection closed!" ); return ; } input.flip(); while (input.hasRemaining()) { byte b = input.get(); buf.put(b); if (b == '\n' ) { System.out.println("meet a new line!" ); buf.flip(); byte [] msg = new byte [buf.limit()]; buf.get(msg); buf.flip(); String message = new String (msg); System.out.println("receive message:" + message); process(key); } } } public void handleWrite (SelectionKey key) throws IOException { HandlerContainer container = (HandlerContainer) key.attachment(); IoHandler handler = container.getHandler(); handler.handleWrite(key); } }
SubReactor 的功能主要是负载监听 SocketChannel
的读写事件,然后分发给线程池去处理。
2.3 Channel 的分配
MainReactor 接收到新的连接,会产生一个 SocketChannel
对象,按照一定的算法分配给 SubReactor。这个分配主要由 SelectorManager
对象完成,我们分析下其代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class SelectorManager { private Selector[] selector; private SubReactor[] subReactors; private int next; private int length; public SelectorManager () throws Exception { length = Runtime.getRuntime().availableProcessors(); selector = new Selector [length]; subReactors = new SubReactor [length]; for (int i = 0 ; i < length; i++) { selector[i] = Selector.open(); subReactors[i] = new SubReactor (selector[i]); } next = 0 ; for (int i = 0 ; i < length; i++) { subReactors[i].start(); } } public void register (SocketChannel channel) { subReactors[next].register(channel); selector[next].wakeup(); System.out.println("chose subRactor:" + next); if (++next == length) { next = 0 ; } } }
在这里,SelectorManager 的功能主要包括两个方面:1)创建及初始化
SubReactor 数组;2)根据轮洵算法分配 Channel。
3. 实现
Netty 实现了 Reactor 模式,其整体结构如下所示:
在 Netty 服务启动的时候会配置两个 EventLoopGroup bossGroup 和
WrokerGroup,EventLoopGroup 可以包含一个或多处 EventLoop,每一个
EventLoop 包含一个 Selector (也可能是
epoll,取决于实现)对象,同时它是一个独立的线程,可独立负载 I/O
请求。对比 Reactor,bossGroup 相当于
MainReactor,这负责监听网络的连接请求(生成
SocketChannle对象),并将其分配给 workerGroup,在这里,只包含一个
EventLoop;workerGroup 相当于 subReactor,监听连接的读写请求。下面分析下
Netty 中关于 EventLoopGroup 的代码实现。
3.1 初始化
1、线程数设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 EventLoopGroup bossGroup = new NioEventLoopGroup (1 );EventLoopGroup workerGroup = new NioEventLoopGroup ();final EchoServerHandler serverHandler = new EchoServerHandler ();try { ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100 ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null ) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } b.group(bossGroup, workerGroup);
在这里使用的是 NioEventLoopGroup,bossGroup 设置的线程数为 1,而
workerGroup 没有设置线程数,使用默认配置的数量:2 * cpu size。
1 2 3 4 5 6 7 8 9 protected MultithreadEventLoopGroup (int nThreads, Executor executor, Object... args) { super (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } DEFAULT_EVENT_LOOP_THREADS = Math.max(1 , SystemPropertyUtil.getInt( "io.netty.eventLoopThreads" , NettyRuntime.availableProcessors() * 2 ));
2、创建 EventLoop 数组
NioEventLoopGroup 是 EventLoop 对象的容器集合,持有多个 EventLoop
对象,它的数量与线程数量一致,同时 NioEventLoopGroup 负责分配
SocketChannel,需要有一个分配的策略对象,这些是在其父类的构造函数中实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 protected MultithreadEventExecutorGroup (int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0 ) { throw new IllegalArgumentException (String.format("nThreads: %d (expected: > 0)" , nThreads)); } if (executor == null ) { executor = new ThreadPerTaskExecutor (newDefaultThreadFactory()); } children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { ... } chooser = chooserFactory.newChooser(children); ... }
在 MultithreadEventExecutorGroup 构造函数中,主要做了三个工作:
定义 EventLoop 中的线程执行器,每一个 EventLoop
都包含一个线程,其线程由 ThreadPerTaskExecutor 生成;
初始化及生成 EventLoop 数组 ,newChild
方法由子类来实现,不同的模式有不同的实现;
定义 channel 的分配策略,根据 EventLoop 的数量有不同的实现。
在 NioEventLoopGroup 中,newChild 实现代码所示: 1 2 3 4 5 protected EventLoop newChild (Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3 ] : null ; return new NioEventLoop (this , executor, (SelectorProvider) args[0 ], ((SelectStrategyFactory) args[1 ]).newSelectStrategy(), (RejectedExecutionHandler) args[2 ], queueFactory); }
newChild 方法的细节在后面的文章中再进行介绍。
3、Channel 分配策略
channel 的分配策略有两种,分别是:PowerOfTwoEventExecutorChooser 和
GenericEventExecutorChooser,它们本质上都是轮洵算法,只是当 EventLoop
的数量是 2 的幂次方时,对算法做了优化,使用位操作代替取余操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public EventExecutorChooser newChooser (EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser (executors); } else { return new GenericEventExecutorChooser (executors); } } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger (); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this .executors = executors; } @Override public EventExecutor next () { return executors[idx.getAndIncrement() & executors.length - 1 ]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicLong idx = new AtomicLong (); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this .executors = executors; } @Override public EventExecutor next () { return executors[(int ) Math.abs(idx.getAndIncrement() % executors.length)]; } }
3.2 Channel 注册
在 Nio 模式下,Channel 有两种类型,分别是:NioServerSocketChannel 和
NioSocketChannel,其中 NioServerSocketChannel 用于监听网络连接请求,生成
NioSocketChannel 连接,该 Channle 注册到 BossGroup 的 EventLoop 中,而
NioSocketChannel 负责真正的网络读写,注册到 WorkerGroup 的 EventLoop
中。
1、NioServerSocketChannel 注册
在 Netty 的服务器启动过程中,主要的流程是一个 bind
操作,其流程包括:
创建 NioServerSocketChannel 类,完成初始化的工作,其中包括添加
ChannelHandler 类;
将 NioServerSocketChannel 注册到 EventLoop 中,同时向 Selector
对象中注册,不过此时并没有注册 OP_ACCEPT 事件;
执行网络层的 bind 操作;
执行读操作,主要是向 Selector 注册 OP_ACCEPT
事件。执行该操作后,便可接收网线的连接请求了。
NioServerSocketChannel 注册穿插在上面的 4 个步骤中,主要包括 1)将
NioServerSocketChannel 注册到 EventLoop 中;2)向 Selector 对象注册
OP_ACCEPT 事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public final void register (EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { ... } } } protected void doRegister () throws Exception { boolean selected = false ; while (true ) { try { this .selectionKey = this .javaChannel().register(this .eventLoop().selector, 0 , this ); return ; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this .eventLoop().selectNow(); selected = true ; } } } protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
在上面的代码可以看到 registor 操作主要是分配一个 EventLoop,并将
EventLoop 赋值给 NioServerSocketChannel。向 Selector 注册 Channel
则分为两次,第一次注册时事件参数为
0,等于没有注册任何事件;第二次是在底层 Channel bind
操作之后,准备就绪之后,再注册 OP_ACCEPT 事件。
2、NioSocketChannel 注册 在 bind
操作的流程中,第一步是创建 NioServerSocketChannel
类,并进行初始化,此时会注册 ChannelHandler 类,其中就有一个
ServerBootstrapAcceptor handler
类,它的主要功能就是收到网络请求之后对NioSocketChannel
类进行参数配置,将其注册到 workerGroup 中,其核心代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
NioSocketChannel 和 NioServerSocketChannel
注册流程是一致的,差别只是注册到不同的 EventLoopGroup 及注册不同的 I/O
事件,其中 NioSocketChannel 注册的是 OP_READ 事件,而
NioServerSocketChannel 注册的是 OP_ACCEPT 事件。
3.3 事件循环
EventLoop 本质是一个事件循环,不断地从 Selector (Epoll) 对象中获取
I/O
事件,执行解码/反序列化操作后,再分发到上层的业务线程进行处理。另外一方面它可以执行用户自定义任务,如定时进行
Channel 空闲状态的检测,其核心代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 protected void run () { int selectCnt = 0 ; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue ; ... } } catch (IOException e) { ... } selectCnt++; cancelledKeys = 0 ; needsToSelectAgain = false ; final int ioRatio = this .ioRatio; boolean ranTasks; if (ioRatio == 100 ) { try { if (strategy > 0 ) { processSelectedKeys(); } } finally { ranTasks = runAllTasks(); } } else if (strategy > 0 ) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0 ); } ... } catch (Throwable t) { handleLoopException(t); } finally { ... } } }
从上面的代码可以看到,根据计算的执行策略,可以为 I/O
事件处理及自定义任务分配不同的执行时间,详尽的代码在后面的文章介绍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { ... try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
I/O 事件的处理本质是处理 channel 的各种 I/O 事件,其中将 OP_ACCEPT
抽象为 Netty的 read 事件,可以理解为读取的数据是 NioSocketChannel
对象,其代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = this .javaChannel().accept(); try { if (ch != null ) { buf.add(new NioSocketChannel (this , ch)); return 1 ; } } catch (Throwable var6) { logger.warn("Failed to create a new channel from an accepted socket." , var6); try { ch.close(); } catch (Throwable var5) { logger.warn("Failed to close a socket." , var5); } } return 0 ; }
可以看出来,NioServerSocketChannel 收到 OP_ACCEPT 事件后,会生成
SocketChannel 对象,然后通过 ServerBootstrapAcceptor handle
类处理后,注册到 workerGroup 中,再监听 SocketChannel 对象的 OP_READ
事件,最终实现网络数据的读写。
4. 总结
通过对 Netty 中 Reactor 模型的分析,对 Netty 的线程模型及 I/O
的事件处理有了一个初步的认识,后续的文章将对涉及到的模块进行详尽的分析,希望能够深入理解
Netty 的设计思路。