1. 概述 EventLoop 是 Reactor 模式中的执行者,首先它持有 Selector 对象,监听多路 SocketChannel 的网络 I/O 事件,并对 I/O 事件分发处理。同时,它持有一个 Thread 对象,除了监听网络 I/O 事件, EventLoop 也可以执行提交的任务,包括定时任务,总结来说,EventLoop 具有如下三大功能:
负责监听 SocketChannel 对象的 I/O 事件;
处理分发 I/O 事件;
执行任务,包括定时任务。
整体的处理流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 protected void run () { int selectCnt = 0 ; for (;;) { try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } final int ioRatio = this .ioRatio; final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } catch (Throwable t) { handleLoopException(t); } finally { } } }
说明: 代码以 NioEventLoop 为例。
2. 流程 上文分析了 EventLoop 的主要功能,在这部分内容中主要讲述这三大功能及线程的创建,我们先从线程的创建开始。
2.1 线程创建 线程相关的变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private volatile Thread thread;private volatile int state = ST_NOT_STARTED;private static final int ST_NOT_STARTED = 1 ;private static final int ST_STARTED = 2 ;private static final int ST_SHUTTING_DOWN = 3 ;private static final int ST_SHUTDOWN = 4 ;private static final int ST_TERMINATED = 5 ;private final Executor executor;
EventLoop 中持有一个线程的引用,在第一次执行任务的时候启动,提供任务的线程如果已经是 EventLoop 线程,将任务提交给任务即可,如果是非 EventLoop 线程,则需要启动 EventLoop 线程。
1 2 3 4 5 6 7 8 9 10 private void execute (Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); } }
EventLoop 中通过 state 字段来标识是否启动,通过判断该字段来决定是否启动线程,其中 state 字段是 volatile 类型,并通过 AtomicIntegerFieldUpdater 进行原子更新,保证线程的安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void startThread () { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this , ST_NOT_STARTED, ST_STARTED)) { boolean success = false ; try { doStartThread(); success = true ; } finally { if (!success) { STATE_UPDATER.compareAndSet(this , ST_STARTED, ST_NOT_STARTED); } } } } }
线程的启动是通过 executor 添加一个任务,在 executor 中启动一个线程,再把该线程赋值给 EventLoop,等同于使用了只有一个线程的线程池来生成线程。可以看到,任务的主体是 SingleThreadEventExecutor.this.run() 方法,该方法就是前文说到的 EventLoop 主体业务逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void doStartThread () { assert thread == null ; executor.execute(new Runnable () { @Override public void run () { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false ; updateLastExecutionTime(); try { SingleThreadEventExecutor.this .run(); success = true ; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: " , t); } finally { } } }); }
executor 使用的是 ThreadPerTaskExecutor 对象,在 NioEventLoopGroup 初始化的时候生成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public NioEventLoopGroup (int nThreads) { this (nThreads, (Executor) null ); } protected MultithreadEventExecutorGroup (int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (executor == null ) { executor = new ThreadPerTaskExecutor (newDefaultThreadFactory()); } children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { } } } protected EventLoop newChild (Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3 ] : null ; return new NioEventLoop (this , executor, (SelectorProvider) args[0 ], ((SelectStrategyFactory) args[1 ]).newSelectStrategy(), (RejectedExecutionHandler) args[2 ], queueFactory); }
ThreadPerTaskExecutor 执行器会生成一个新的线程来执行新的任务,该线程就是 FastThreadLocalThread 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; @Override public void execute (Runnable command) { threadFactory.newThread(command).start(); } } public class DefaultThreadFactory implements ThreadFactory { @Override public Thread newThread (Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); return t; } protected Thread newThread (Runnable r, String name) { return new FastThreadLocalThread (threadGroup, r, name); } }
通过上面的代码分析,EventLoop 中的线程由 ThreadPerTaskExecutor 执行器生成,线程对象为 DefaultThreadFactory。
2.3 监听网络 I/O 事件 Selector 相关的变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private Selector selector;private Selector unwrappedSelector;private SelectedSelectionKeySet selectedKeys;private final SelectorProvider provider;private static final long AWAKE = -1L ;private static final long NONE = Long.MAX_VALUE;private final AtomicLong nextWakeupNanos = new AtomicLong (AWAKE);private final SelectStrategy selectStrategy;private int cancelledKeys;private boolean needsToSelectAgain;
在这个阶段有三个重点:
设置 selector 的超时时间,主要是以下一个定时任务执行的时间间隔作为参考来设置超时时间,避免阻塞定时任务的准时执行;
selector 唤醒的机制,如果超时时间过长,中途有任务插入,需要执行,此时需要中断 selector;
重建 selector,解决 bug 8566。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 protected void run () { int selectCnt = 0 ; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue ; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L ) { curDeadlineNanos = NONE; } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default : } } catch (IOException e) { rebuildSelector0(); selectCnt = 0 ; handleLoopException(e); continue ; } } catch (Throwable t) { handleLoopException(t); } finally { } } }
计算下一个定时任务执行的时间间隔逻辑比较简单,将 ScheduledFutureTask 任务添加到 scheduledTaskQueue 队列中,而 scheduledTaskQueue 是一个优先级队列,它已经将 ScheduledFutureTask 根据执行时间进行了排序,取出的第一个元素便是最近将要执行的任务,计算它还需要多久需要执行便可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 protected final long nextScheduledTaskDeadlineNanos () { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); return scheduledTask != null ? scheduledTask.deadlineNanos() : -1 ; } final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this .scheduledTaskQueue; return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null ; } PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = new DefaultPriorityQueue <ScheduledFutureTask<?>>( SCHEDULED_FUTURE_TASK_COMPARATOR, 11 ); final class ScheduledFutureTask <V> extends PromiseTask <V> implements ScheduledFuture <V>, PriorityQueueNode { private static final long START_TIME = System.nanoTime(); private long id; private long deadlineNanos; private final long periodNanos; private int queueIndex = INDEX_NOT_IN_QUEUE; @Override public int compareTo (Delayed o) { if (this == o) { return 0 ; } ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0 ) { return -1 ; } else if (d > 0 ) { return 1 ; } else if (id < that.id) { return -1 ; } else { assert id != that.id; return 1 ; } } }
外部线程执行新任务的时候,任务需要立即执行的话,需要唤醒 selector,避免因 selector 长时间等待错过执行时机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void execute (Runnable task, boolean immediate) { if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } protected void wakeup (boolean inEventLoop) { if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) { selector.wakeup(); } }
重建 selector,解决 bug 8566 的逻辑,后续有时间再另外分析。
2.4 处理及分发网络 I/O 事件 在 NioEventLoop 中,处理 I/O 事件的时间与执行任务的时间比率为 1:1,即两者的执行时间是相等的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private volatile int ioRatio = 50 ;final long ioStartTime = System.nanoTime();try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
现在分析 I/O 事件的处理,这里提供了两种方式,一种是经过 Netty 优化过数据结构的方式,一种是 Java 原生的方式,它们之间的区别主要是存放 SelectionKey 对象的底层数据结构的差异,而处理流程没有变。
1 2 3 4 5 6 7 8 private void processSelectedKeys () { if (selectedKeys != null ) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
优化的方法主要是通过反射的方式,将 Selector 中 selectedKeys 和 publicSelectedKeys 字段替换为 Netty 版的 SelectedSelectionKeySet。数据结构做了那些优化可以再深入分析源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 final Selector unwrappedSelector;try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException ("failed to open a new selector" , e); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet ()try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys" ); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys" ); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1 ) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null ; } } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true ); if (cause != null ) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true ); if (cause != null ) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null ; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; }
处理 I/O 事件的大致流程如下:
遍历 selectedKeys 集合,处理所有 Channel 的 I/O 事件,一个 SelectionKey 对象代表一个 Channel 的 I/O 事件;
取出 SelectionKey 对象中的附件,该附件由 AbstractNioChannel.doRegister 方法注册到 Selector 对象上,附件就是 AbstractNioChannel 自身,触发 I/O 事件时,再由 SelectionKey 对象返回;
根据附件对象的不同,调用不同的处理逻辑,这里主要是处理 Channel 的 I/O 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 private void processSelectedKeysOptimized () { for (int i = 0 ; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null ; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1 ); selectAgain(); i = -1 ; } } } protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { } } } public abstract SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException;
真正的处理逻辑在 processSelectedKey 方法中处理,这里有两个重点:
写缓存空间充足,注册 OP_WRITE 事件会频繁触发,导致 cpu 空转,所以正常情况下,不需要注册 OP_WRITE 事件,只有在写缓存满的时候才会注册该事件,触发之后进行刷新操作;
在 Netty 中,将 OP_ACCEPT 当作读操作,只不过它读取的数据比较特殊,是 SocketChannel 对象。
事件的处理逻辑包含在 AbstractNioChannel.NioUnsafe 中,由该方法调用 ChannelPipeline 中的回调方法,至此,将事件处理由网络层传递给 Netty 框架层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
以 unsafe.read() 为例,在 NioMessageUnsafe 类的实现中,读取操作会调用 AbstractNioChannel 子类的 doReadMessages 方法读取网络数据,并写入到 readBuf 中,再调用 ChannelPipeline 中的 fireChannelRead 方法将数据传递给 Netty 框架层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList <Object>(); @Override public void read () { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } } finally { } } }
在 NioServerSocketChannel 类 doReadMessages 实现中,读取到的数据是 SocketChannel,该对象会分配给 WorkerGroup 中,由 WorkerGroup 中的 EventLoop 去读取网络数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null ) { buf.add(new NioSocketChannel (this , ch)); return 1 ; } } catch (Throwable t) { } return 0 ; }
而在 NioSocketChannel 类的 doReadBytes 实现中,读取到的是 ByteBuf,传递给上层的是 ByteBuf 对象,上层对象再对其进行反序列化、业务处理等操作。
1 2 3 4 5 6 protected int doReadBytes (ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
至此,根据 I/O 事件的类型,将数据分发到了上层中,上层业务可以继续处理。
2.5 执行任务 2.5.1 执行流程 执行任务相关的变量:
1 2 3 4 5 private final Queue<Runnable> taskQueue;PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
在 EventLoop 中,有两类任务,一是常规的任务,没有时间属性,二是周期性或延时的定时任务,它们分别存放到两个不同的队列。任务执行时,先将到期的定时任务从 scheduledTaskQueue 队列移动到 taskQueue 中,再统一执行 taskQueue 队列中的任务。
任务执行的大致如下:
将到期的定时任务移动到 taskQueue 中;
计算此次执行的时长,如果执行的时间超过设定的执行时长,则退出进行下一轮的事件处理;
遍历执行 taskQueue 中的任务,在两种情况下退出任务的执行:1)任务的执行时长超过了设定的执行时长;2)taskQueue 队列为空;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 protected boolean runAllTasks (long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null ) { afterRunningAllTasks(); return false ; } final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0 ; long runTasks = 0 ; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; if ((runTasks & 0x3F ) == 0 ) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break ; } } task = pollTask(); if (task == null ) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break ; } } afterRunningAllTasks(); this .lastExecutionTime = lastExecutionTime; return true ; }
移动到期的定时任务逻辑相对简单:遍历 scheduledTaskQueue 队列,将到期的任务从 scheduledTaskQueue 队列中移除,再添加到 taskQueue 队列中,如果添加失败,则再添加回 scheduledTaskQueue 队列,等待下次再操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private boolean fetchFromScheduledTaskQueue () { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null ) { return true ; } if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false ; } } }
在定时任务任务中超时的判断是基于相对时间的,起始时间为程序启动的时间。在 scheduledTask 中关联有一个任务执行的截止时间,将这个截止时间与当前计算的时间进行比较,小于当前的时间则说明已经过期,满足执行的条件,则需要将该任务移动到 taskQueue。
另外,scheduledTaskQueue 是一个优先级队列,已经根据截止时间排序,队首的元素是最先到期的任务,如果取到了未到期的任务,则停止遍历,因为后面的任务截止时间更大,没有必要进行比较了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 protected final Runnable pollScheduledTask (long nanoTime) { assert inEventLoop () ; ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0 ) { return null ; } scheduledTaskQueue.remove(); scheduledTask.setConsumed(); return scheduledTask; } private static final long START_TIME = System.nanoTime();static long nanoTime () { return System.nanoTime() - START_TIME; } PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null ) { scheduledTaskQueue = new DefaultPriorityQueue <ScheduledFutureTask<?>>( SCHEDULED_FUTURE_TASK_COMPARATOR, 11 ); } return scheduledTaskQueue; }
2.5.2 任务的添加 上面分析了任务执行的流程,下面看下这两类任务怎么添加到任务队列中。
1、常规任务
常规任务是通过 execute 方法添加的,该方法含义上有执行的意思,但实际上执行该方法,只是将任务添加到 taskQueue 中,任务的执行最终是在 EventLoop 线程中完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void execute (Runnable task) { ObjectUtil.checkNotNull(task, "task" ); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute (Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } protected void addTask (Runnable task) { ObjectUtil.checkNotNull(task, "task" ); if (!offerTask(task)) { reject(task); } } final boolean offerTask (Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
2、定时任务 定时任务有两种:1)延时任务;2)周期性任务。它们是通过 schedule 方法添加,方法定义如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); @Override <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) ; @Override ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); @Override ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
两种类型的定时任务统一封装为 ScheduledFutureTask 任务,添加一个定时任务实际就是添加一个 ScheduledFutureTask 对象到 scheduledTaskQueue 中。添加 ScheduledFuture 的过程中,如果当前线程就是 EventLoop 线程,则直接操作即可,如果当前线程不是 EventLoop 线程,则添加一个常规任务,用来执行该操作。这样设计,应该是出于线程安全的考虑,保证只有 EventLoop 线程执行添加操作。在这里,还需要考虑定时任务已经过期,需要唤醒 EventLoop 线程执行任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private <V> ScheduledFuture<V> schedule (final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduleFromEventLoop(task); } else { final long deadlineNanos = task.deadlineNanos(); if (beforeScheduledTaskSubmitted(deadlineNanos)) { execute(task); } else { lazyExecute(task); if (afterScheduledTaskSubmitted(deadlineNanos)) { execute(WAKEUP_TASK); } } } return task; } final void scheduleFromEventLoop (final ScheduledFutureTask<?> task) { scheduledTaskQueue().add(task.setId(++nextTaskId)); } protected boolean beforeScheduledTaskSubmitted (long deadlineNanos) { return deadlineNanos < nextWakeupNanos.get(); }
ScheduledFutureTask 对象封装了三个功能:
执行添加任务,将 ScheduledFutureTask 对象本身添加到 scheduledTaskQueue 队列;
执行延时任务,由于延时任务只会执行一次,执行完便结束;
执行周期性任务,执行完本轮的任务之外,还需要将 ScheduledFutureTask 添加回 scheduledTaskQueue 队列,等待下一轮执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 final class ScheduledFutureTask <V> extends PromiseTask <V> implements ScheduledFuture <V>, PriorityQueueNode { private static final long START_TIME = System.nanoTime(); private long id; private long deadlineNanos; private final long periodNanos; private int queueIndex = INDEX_NOT_IN_QUEUE; static long nanoTime () { return System.nanoTime() - START_TIME; } static long initialNanoTime () { return START_TIME; } @Override public void run () { assert executor () .inEventLoop(); try { if (delayNanos() > 0L ) { if (isCancelled()) { scheduledExecutor().scheduledTaskQueue().removeTyped(this ); } else { scheduledExecutor().scheduleFromEventLoop(this ); } return ; } if (periodNanos == 0 ) { if (setUncancellableInternal()) { V result = runTask(); setSuccessInternal(result); } } else { if (!isCancelled()) { runTask(); if (!executor().isShutdown()) { if (periodNanos > 0 ) { deadlineNanos += periodNanos; } else { deadlineNanos = nanoTime() - periodNanos; } if (!isCancelled()) { scheduledExecutor().scheduledTaskQueue().add(this ); } } } } } catch (Throwable cause) { setFailureInternal(cause); } } }
至此,任务的执行分析完毕。
3. 总结 EventLoop 担当了网络层与 Netty 框架间的桥梁作用,本质是一个事件循环,不断监听 Channel 的网络 I/O 事件,并进行分发处理。另外,也承担了执行任务的作用,包括常规的任务及定时任务。理解 EventLoop 的事件循环会极大加深对 Netty 的理解。