Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理
2017-09-26 22:23
1356 查看
Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理
在上篇博文分析服务端启动的过程中,我们遇到了如下的代码片段,if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise);//分析 } }); }
以及
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
这些代码片段中,都调用了NioEventLoop类的execute这个方法来提交任务。 本篇博文将以此函数作为切入点,来分析NioEventLoop 启动以及 EventLoop所负责的IO操作和task任务的处理思路。
NioEventLoop类的execute方法
由于NioEventLoop并没有实现此方法,因此,准确来说,应该是NioEventLoop类的超类SingleThreadEventExecutor中的方法。具体代码如下:@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop();//判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,如果不是,则先启动线程,然后添加任务到任务队列中去 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); //如果发现该线程已经关闭则移除任务并拒绝 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
该方法的主要逻辑如下:
1、判断入参数是否为null,如果为null,则抛空指针异常,如果不为null,则进行第2步。
2、通过调用inEventLoop()方法判断当前工作线程是否为该NioEventLoop所关联的线程,如果是,则调用addTask(task)方法将任务添加到任务队列中,如果不是,则进行第3步。
3、启动该NioEventLoop所关联的线程,然后添加任务到任务队列中去 。
4、如果发现该线程已经关闭则移除任务并拒绝
在看startThread()这个方法的具体代码之前,我觉得有必要介绍下EventLoop.execute第一次被调用的地方,因此第一次被调用的地方就是触发startThread()的调用,进而导致了EventLoop所对应的Java线程的启动。
基于这样一个问题,我们开始在服务端启动时开始跟踪代码来寻找。最快的方式是,在EventLoop.execute方法的
startThread();这一行代码打上一个断点,然后看下其调用链即可。
调用链截图如下:
看到了没有,是在服务器端启动的过程中,调用AbstractChannel#AbstractUnsafe.register方法中调用了execute方法,这是因为整个代码都是在主线程中运行的因此在下面的register方法中 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute。eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute,继而调用startThread方法完成NioEventLoop所对应的线程的启动。
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //省略了部分检查代码 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { //省略了部分异常处理代码 } } }
结论:当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动。
EventLoop的启动方法:startThread
上面找到了startThread调用的调用链哈,这里看下startThread()方法SingleThreadEventExecutor.java private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { delayedTaskQueue.add(new ScheduledFutureTask<Void>( this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); thread.start(); } } }
上面startThread方法中的
STATE_UPDATER是 SingleThreadEventExecutor 内部维护的一个属性, 它的作用是标识当前的 thread 的状态. 在初始的时候,
STATE_UPDATER == ST_NOT_STARTED, 因此第一次调用 startThread() 方法时, 就会进入到 if 语句内, 进而调用到 thread.start()完成NioEventLoop所对应的 Java线程的启动.
以上就完成了NioEventLoop所对应的Java线程的启动。那么启动之后,该线程主要用于干什么呢?
这里先说结论:在 Netty 中, 一个 EventLoop 需要负责两个工作, 第一个是作为 IO 线程, 负责相应的 IO 操作; 第二个是作为任务线程, 执行 taskQueue 中的任务.
上面的分析中调用startThread()方法继而调用thread.start()来启动EventLoop 所对应的Java 线程。下面来线程启动具体做了什么哈,也就是看下如下的run方法主要干了什么哈,
thread = threadFactory.newThread(new Runnable() { @Override public void run() { //... SingleThreadEventExecutor.this.run();//调用了NioEventLoop类中的run方法,下面将主要分析 //... });
上面的源码本来很长很长,上面是精简版本,简单来说,这里我们只关注一行代码:
SingleThreadEventExecutor.this.run(),这里就是调用了NioEventLoop的run方法,下面继续跟。
NioEventLoop.run()方法
NioEventLoop类中的run方法的代码如下:@Override protected void run() { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false); try { // 当有任务时为了保证任务及时执行采用不阻塞的selectNow获取准备好I/O的连接 if (hasTasks()) { selectNow(); } else { // 当无任务时采用阻塞等待的方式获取连接 select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys();//分析 runAllTasks();//分析 } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
上面代码的 for(;;) 是一个无条件的死循环, 这里就是 NioEventLoop 事件循环的核心。其基本逻辑为: 首先调用hasTasks()方法判断当前任务队列taskQueue是否为空。有两种情况, 一种是有任务,则调用selectNow,一种是无任务,则调用select(oldWakeUp),这selectNow和select(oldWakeUp)两者都是获取已经准备好的连接,不同的是select(oldWakeUp)会产生阻塞.
为什么会根据taskQueue是否有任务来调用不同的select方法呢?
答案:如前面所说在 Netty 中, 一个 EventLoop 需要负责两个工作, 第一个是作为 IO 线程, 负责相应的 IO 操作; 第二个是作为任务线程, 执行 taskQueue 中的任务。基于EventLoo的职责所在因此当taskQueue无任务的时候Netty可以选择阻塞的等待IO就绪事件,不影响其他的东西(因此此时taskQueue为空,没有任务需要执行,是吧),而当taskQueue有任务的时候如果选择阻塞的等待IO就绪事件,则就导致taskQueue中的任务不能尽快得到执行,是吧,因此当taskQueue有任务的时候就会调用飞阻塞的selectNo方法,以保证taskQueue中的任务可以尽快执行。
protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
1、如果taskQueue不为空,则调用selectNow()方法
selectNow()方法的代码如下:void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
该方法调用了selector.selectNow(),这个 selector 字段正是 Java NIO 中的多路复用器 Selector(KQueueSelectorImpl实例)。
selector.selectNow()方法在博文Java NIO 之 Selector(第二部分selector.select()) 有详细的介绍。
selector.select()函数功能:选择一些I/O操作已经准备好的管道。每个管道对应着一个key。这个方法 是一个阻塞的选择操作。当至少有一个通道被选择时才返回。当这个方法被执行时,当前线程是允许被中断的。
selector.selectNow()这个方法与select()的区别在于,是非阻塞的,即当前操作即使没有通道准备好也是立即返回,只是返回的是0,不会阻塞当前线程.
selectNow()方法在KQueueSelectorImpl类的父类中。
public int selectNow() throws IOException { return this.lockAndDoSelect(0L); } private int lockAndDoSelect(long var1) throws IOException { synchronized(this) { if(!this.isOpen()) {//检查这个Selector是否打开 throw new ClosedSelectorException(); } else { //双重锁 Set var4 = this.publicKeys; int var10000; synchronized(this.publicKeys) { Set var5 = this.publicSelectedKeys; synchronized(this.publicSelectedKeys) { var10000 = this.doSelect(var1);//分析 } } return var10000; } } }
上面首先检查了这个Selector是否打开,如果没有打开,则抛异常,否则利用双重锁定调用了doSelect(var1)这个方法。
KQueueSelectorImpl.java protected int doSelect(long var1) throws IOException { boolean var3 = false; if(this.closed) { throw new ClosedSelectorException(); } else { this.processDeregisterQueue(); int var7; try { this.begin(); var7 = this.kqueueWrapper.poll(var1); } finally { this.end(); } this.processDeregisterQueue(); return this.updateSelectedKeys(var7); } }
重点在于
var7 = this.kqueueWrapper.poll(var1);这行代码,不太懂这个里面干了些什么。先不管。
KQueueArrayWrapper.java
int poll(long var1) { this.updateRegistrations(); int var3 = this.kevent0(this.kq, this.keventArrayAddress, 128, var1); return var3; }
到这里就看完了selectNow()方法,下面将分析如果taskQueue不为空是调用select(boolean oldWakenUp)方法逻辑。
2、如果taskQueue为空,则调用select(oldWakenUp)方法
select(oldWakenUp)方法的代码如下:private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } }
上面的代码很长,逻辑也比较复杂哈,但如下的简化代码就是该方法select与selectNow()这个方法的主要区别:前者selectNow()方法是通过调用selector.selectNow()立即返回不会阻塞当前线程,而在这个 select 方法中, 调用了 selector.select(timeoutMillis), 而这个调用是会阻塞住当前线程的, timeoutMillis 是阻塞的超时时间。
private void select(boolean oldWakenUp) throws IOException { //... int selectedKeys = selector.select(timeoutMillis); //... }
上面的简化代码只是想说明在netty中select与selectNow方法的区别,下面还是详细的分析下select方法。具体分解为如下的8点:
1、delayNanos(currentTimeNanos)
该方法的代码如下:
protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { return SCHEDULE_PURGE_INTERVAL;//TimeUnit.SECONDS.toNanos(1);即1s } return delayedTask.delayNanos(currentTimeNanos); } ScheduledFutureTask.java public long delayNanos(long currentTimeNanos) { return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));//START_TIME为任务的创建时间 } public long deadlineNanos() { return deadlineNanos; }
该方法主要是计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟执行的时间)。注意:每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列:
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>()),在启动线程的时候会往队列中加入一个任务)。最终的结果:{创建该任务指定的延迟时间-(当前时间-delayedTask创建的时间)}。如果队列中没有任何任务,则默认返回1秒钟。
在前面我们介绍启动线程的startThread()方法(代码如下)时,就添加了一个定时任务,该任务所做的事情为:将定时任务队列中的已取消任务从队列中移除,该任务每间隔1秒执行1次 ,即这个任务的deadlineNanos = SCHEDULE_PURGE_INTERVAL = 1s。
SingleThreadEventExecutor.java private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { delayedTaskQueue.add(new ScheduledFutureTask<Void>( this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); thread.start(); } } }
继续往后面看
2、从如下的代码可以得到:如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; }
3、如果大于500000纳秒,则执行selector.select(timeoutMillis),这个方法在博文Java NIO 之 Selector(第二部分selector.select())中有详细的介绍。简单来说:select(long timeout)在等待有通道被选择时至多会阻塞timeout毫秒。
selector.select()函数功能:选择一些I/O操作已经准备好的管道。每个管道对应着一个key。这个方法 是一个阻塞的选择操作。当至少有一个通道被选择时才返回。当这个方法被执行时,当前线程是允许被中断的。
而select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
4、selector.select(timeoutMillis)执行完毕之后,则开始执行下面的代码:
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; }
从上面的代码可知:如果存在以下几种情况则退出for循环:
1)已经ready的selectionKey,2)selector被唤醒,3)任务taskQueue不为空,4)scheduledTaskQueue不为空。5)线程被中断
5、如果
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,即说明selector.select(timeoutMillis)等待了timeoutMillis长的时间,将selectCnt置为1,便于下次循环时退出循环。如果等待时间小于timeoutMillis时间,则进行第6步和7步。
6、如果 selectCnt 没达到阈值SELECTOR_AUTO_REBUILD_THRESHOLD(默认512),则继续进行for循环。其中 currentTimeNanos 在select操作之后会重新赋值当前时间,如果selector.select(timeoutMillis)行为真的阻塞了timeoutMillis,第二次的timeoutMillis肯定等于0,此时selectCnt 为1,所以会直接退出for循环。
7、如果selectCnt达到阈值,则说明,执行了这一段for循环达512次还没有break,这是什么原因呢?这是java早期的nio bug会导致cpu 100%, 此时select(timeout)根本不会阻塞(如果阻塞了则不能达到阈值哈)而直接返回0, 在netty中判断是否发生了此bug的方式为:为在很短时间内(小于1秒)完成了多次(默认512)select(timeout),则发生了该bug,此时进行rebuildSelector来消除bug。即如果selectCnt 达到阈值SELECTOR_AUTO_REBUILD_THRESHOLD,则会进行重建。就如下面rebuildSelector()方法上的注释所言:重建的目的就是消灭epoll 100% cpu bug。
rebuildSelector()方法的代码如下,比较长,但主要干了如下2件事:
1、通过调用openSelector()方法创建一个新的selector。
2、将old selector的selectionKey执行cancel,并将oldSelector中的channel重新register到newSelector上。
具体细节这里不研究。
重建的思路基本都是这样:1)创建一个新的;2)把旧的有用的参数赋值到新的上面;3)摧毁(或抛弃)旧的。
/** * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. */ public void rebuildSelector() { //inEventLoop()方法判断启动线程与当前线程相同,相同表示已经启动,不同则添加一个任务到该NioEventLoop的线程中 if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector();//重建 } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { //第一步:通过调用openSelector()创建新的selector。 newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (;;) { try { //第二步:将oldSelector中的所有key cancel,并将key上的channel重新register在newSelector上。 for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel();//将old selector上的key cancel掉 //将old selector上的channel重新注册到new selector上 SelectionKey newKey = key.channel().register(newSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } break; } selector = newSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }
8、在重建完成之后,再一次调用方法selectNow,检查是否有已ready的selectionKey,为后面的处理做准备。
3、run方法的后半部分的分析
到这里就分析了NioEventLoop类中run方法的前面一部分:通过 select/selectNow 调用查询当前是否有就绪的 IO 事件。下面继续run方法的后半部分(如下):如果前面一部分通过select/selectNow查询有 IO 事件就绪时, 这一步就是处理这些 IO 事件了。if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
前面说过:在 Netty 中, 一个 EventLoop 有两个职责, 第一个是作为 IO 线程, 负责相应的 IO 操作; 第二个是作为任务线程, 执行 taskQueue 中的任务。
每个职责对应的一个处理函数,在上面的代码中反应了出来,第一个是 processSelectedKeys()函数负责处理第一步select/selectNow查询所得到的就绪的 IO 事件,第二个是 runAllTasks()函数负责处理taskQueue中的任务。
既然这个EventLoop所对应的Java 本地线程需要处理这两种任务,因此就会有一定的时间分配哈,是吧,因此代码中的 ioRatio 变量表示的就是此线程分配给 IO 操作所占的时间比(即 运行processSelectedKeys 耗时在整个循环中所占用的时间),例如:ioRatio默认为 50 ,表示的就是 执行 IO 操作和执行 tash 任务 所占的时间比值为1:1。
当知道了 IO 操作耗时和它所占用的时间比, 那么执行 task 的时间就可以很方便的计算出来了:
设 IO 操作耗时为 ioTime, ioTime 占的时间比例为 ioRatio, 则: ioTime / ioRatio = taskTime / taskRatio taskRatio = 100 - ioRatio => taskTime = ioTime * (100 - ioRatio) / ioRatio
当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用 processSelectedKeys()、runAllTasks(); 而当 ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下 processSelectedKeys() 所执行的时间(即 IO 操作的耗时), 然后根据公式, 计算出执行 task 所占用的时间, 然后以此为参数, 调用 runAllTasks()。
以上就是上面这块代码的核心思想,下面将具体来分析下用于IO操作的processSelectedKeys() 函数,然后分析用于 处理 task任务的 runAllTasks() 函数。
1、processSelectedKeys() 分析
此方法的源码如下:private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
先看processSelectedKeysOptimized方法,processSelectedKeysPlain方法与该方法类似,本博文就不分析此方法了。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; // 在获取所有key(即flip)时会将最后一个有效key的下一个位置值为null,因此碰到null,说明所有有效的key已经获取完 ,因此利用break跳出循环 if (k == null) { break; } // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null;//方便gC final Object a = k.attachment(); // key关联两种不同类型的对象,一种是AbstractNioChannel,一种是NioTask if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 如果需要重新select则重置当前数据 if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 for (;;) { if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; i++; } selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
此方法的代码也比较长哈,但是关键的点就两个: 迭代 selectedKeys 获取就绪的 IO 事件, 然后为每个事件都调用 processSelectedKey 来处理它.
上面的处理过程中有一个needsToSelectAgain,什么情况下会触发这个条件呢。当多个channel从selector中撤销注册时,由于很多数据无效了(默认为256),需要重新处理,设置needsToSelectAgain=true的函数如下:
void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } }
接下来看下processSelectedKey
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //检查该SelectionKey是否有效,如果无效,则关闭channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // Connection already closed - no need to handle write. return; } } // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
该函数主要SelectionKey k进行了检查,然后如下几种不同的情况
1)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
2)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
3)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
2、runAllTasks()分析
最后分析下NioEventLoop类run方法中的最后调用的一个方法:runAllTasks();//SingleThreadEventExecutor.java /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. * * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }
该函数的功能:从任务队列中拉取所有的任务,然后运行这些任务的run方法来执行他们。如果至少有一个任务被执行就返回true。具体思路如下2点:
1)、通过调用如下的fetchFromDelayedQueue函数从定时任务队列拉取已经到“执行时间”的任务到任务队列;例如:你有一个任务9点中开始,最晚执行时间为10点,如果现在时间为10点半,则此任务就会从延迟任务队列delayedTaskQueue加入到任务队列taskQueue中去。
private void fetchFromDelayedQueue() { long nanoTime = 0L; for (;;) { ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { break; } if (nanoTime == 0L) { nanoTime = ScheduledFutureTask.nanoTime();//System.nanoTime() - START_TIME; } if (delayedTask.deadlineNanos() <= nanoTime) { delayedTaskQueue.remove(); taskQueue.add(delayedTask); } else { break; } } }
2)、通过调用如下的pollTash()方法获取任务队列中任务,然后调用该任务的run方法执行此任务,就这样依次执行完任务队列中所有的任务。
protected Runnable pollTask() { assert inEventLoop(); for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue; } return task; } }
runAllTasks()方法的另一个重载的方法的代码如下所示,runAllTasks(long timeoutNanos)该方法与runAllTasks()的不同在于:有执行时间的限制,即只允许执行一定的时间,时间到即使任务还没有全部执行完则退出。
/** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. */ protected boolean runAllTasks(long timeoutNanos) { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }
到这里,我们就从以NioEventLoop类execute方法作为切入点,研究了EventLoop所对应的Java 本地线程的启动,研究了此线程启动后是如何来处理EventLoop所负责的IO操作和task任务的。
参考资料
1、https://segmentfault.com/a/11900000074039372、http://blog.csdn.net/youaremoon/article/details/50311341
3、http://xw-z1985.iteye.com/blog/1928244
相关文章推荐
- Netty源码分析之NioEventLoop(转)
- 【Netty4.X】Netty源码分析之NioEventLoop(六)
- 【Netty4.X】Netty源码分析之NioEventLoopGroup(五)
- Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
- Netty源码分析:NioEventLoopGroup
- Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析
- Netty线程模型源码分析之NioEventLoopGroup的初始化
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- netty 启动分析 NioEventLoopGroup
- 【Netty源码解析】NioEventLoop
- 【Netty源码解析】NioEventLoop
- Netty源码分析之EventLoop相关结构分析
- 【Netty源码解析】NioEventLoop
- netty源码分析(一)EventLoopGroup
- netty EventLoop write() 源码分析(二)
- netty EventLoop 源码分析(一)
- Netty源码学习(三)NioEventLoop
- Hadoop0.21.0源码流程分析(3)-Task节点管理启动任务
- android netty5.0 编译时 java.lang.NoClassDefFoundError: io.netty.channel.nio.NioEventLoopGroup