Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
2017-03-21 16:29
1516 查看
1、内存模型总结
Netty内存模型基于主从Reactor模型;Channel会绑定一个线程模型(EventLoopGroup),与该通道的读,写等事件都在一个EventLoopGroup中执行,避免了Handler执行的线程安全问题。
线程模型前置篇:
Nio实现Reactor模式
图说netty线程模型
2、源码分析NioEventLoopGroup初始化流程
2.1 NioEventLoopGroup构造方法
重点关注构造方法如下参数:
1、int nEventLoops EventLoop个数
2、Executor executor 任务执行器
3、SelectorProvider selectorProvider Nio Selector
并且在使用 EventLoopGroup boosGroup = new EventLoopGroup();时,在未调用其父类构造时,nEventLoops为0, executor为null,selectorProvider为特定平台下的Selector实现类。
2.2 进入到直接父类MultithreadEventLoopGroup构造方法
该构造方法,只是初始化nEventLoops参数,如果为0,则使用CPU可用核心数的2倍。
2.3 进入到父类MultithreadEventExecutorGroup中,这里是具体初始化的地方
代码@1、创建该线程模型的线程执行器,此处返回的是io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors。
代码@2、开始创建该组的工作EventExecutor数组,其中真正存放的实例由代码@4中创建。
代码@3、创建从线程组(EventExecutor)执行器中选择一个执行。
代码@4、创建一个具体的执行器、有具体的线程模型(EventLoopGroup)子类实现。这里是下一步要重点关注的对象,NioEventLoopGroup实例化的对象为NioEventLoop:
代码@5、如果创建执行器失败,则关闭资源。
代码@6、添加相关事件通知处理器。
执行到这里,NioEventLoopGroup的初始化完成,目前可以得出如下
1个NioEventLoopGroup可以有多个 NioEventLoop,轮流接受客户端请求;
同一个NioEventLoopGroup中的NioEventLoop共同持有一个Executor,这个Executor是何许人也(io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors)。接下来将重点探究NioEventLoop的类层次结构与其实现。
3、NioEventLoop初探k
NioEventLoop继承自SingleThreadEventLoop,从字面上讲,NioEventLoopGroup的每一个NioEventLoop是一个线程的事件循环器,而每一个NioEventLoop中的执行器(EventExecutor)是一个并发度为nEventLoops的ForkJoinPool。
NioEventLoop的构造方法如下:
这里大家是否想过一个问题,为什么单个线程的EventLoop(SingleThreadEventLoop)的事件循环器,里面需要用一个线程池呢?这里用单个线程当事件循环器有什么作用呢?。我目前的理解是,事件循环器(EventLoop)其实就是一个IO线程,首先使用单个线程来实现,简单高效,没有线程切换开销,多线程访问等问题;并且Netty将一个通道的读、写等操作都绑定到一个相同的事件循环器,这样有利于状态的保存,比如说可以比较方便的在Handler(Handler在IO线程中执行)使用线程本地变量(ThreadLocal)、同时减少线程切换。而使用一个线程池,而不是一个简单的线程,主要是为了提高程序的健壮性,如果单一线程由于异常,导致该线程消亡后,线程池会另起一个新的线程继续提供服务。[但是,从后面的分析看,NioEventLoop中的线程也是轮流执行的。]
接下来,将从SingleThreadEventExecutor,整个线程模型的执行者开始相信接口执行器内部运作逻辑。
4、SingleThreadEventExecutor源码分析
SingleThreadEventExecutor是NioEventLoopGroup的具体执行器,也就是NioEventLoopGroup中运行的线程,其实就是SingleThreadEventExecutor。本文将从重点属性、构造方法、核心方法三方面剖析该类的实现。
4.1 重要属性
子类可以通过重写newTaskQueue方法,重写底层的任务队列。
4.3 核心方法
jdk并发包中线程池的实现,比如submit等方法,都是先包装成相关Task,然后调用execute方法。SingleThreadEventExecutor类,最终父类就是Executor,所以,我们从executor方法开始研究。
4.3.1 executor 方法
代码@1,是否在事件循环中,具体实现如下:
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
怎么解释呢?提交任务的操作,比如需要提交一个读任务,或写任务,如果调用的API的线程是当前的EventLoop,则直接加入到任务队列中等待执行,如果是其他线程调用的,则启动该EventLoop线程进行调度,然后放入到任务队列中。举个简单的例子,在IO线程中(解码出请求信息后),将任务放入到业务队列中去处理的时候,如果调用channel.write方法,此时要注意的是,真实的writer方法的调用,不会在业务线程中调用,因为次数该业务线程并不是EventLoop的执行线程,只会将任务放入到队列,然后业务线程中直接返回(理解这里的异步操作)。
代码@2,如果是EventLoop执行的任务,直接加入任务队列
代码@3,如果是其他线程,则启动调度,稍后进行详细的代码分析。
代码@5,代码@6,如果停止执行,则拒绝服务。
代码@6,7,唤醒选择器,感觉这里addTaskWakesUp 这个变量有点问题,不过具体的唤醒选择器逻辑是对于事件模型来说比较重要,在相关的子类中都有重写。故这部分在研究NioEventLoop时再重点关注。
接下来重点研究步骤3,startExecution方法的实现
4.3.2 startExecution方法详解
代码@1,该startExecution方法支持多次调用,因为对状态进行了CAS检测并设置,如果启动状态为未启动,并设置为启动中成功,则继续下文的启动流程。
代码@2,新建一个任务,该任务中取消任务队列中的任务,并移除。具体代码下文会重点分析。
代码@3,启动调度器,此处先将当前运行的线程设置为空,代码@5,可以使用lazySet来更新的原因是,对其可见性要求没那么高,因为在添加一个任务的时候,就算检测到当前线程不是EventLoop线程,也就是asRunnable线程,也没关系,会先调用startExecution方法,等其asRunable运行,然后再放入队列中。
代码@4,核心所在呀,这里让SingleThreadEventExecutor真正的名副其实是单个线程。尽管每个EventLoop的执行器是一个并发度为nEventCount的ForkJoinPool线程池。
代码@6,在ForkJoinPool中执行的任务,就是asRunnable中run方法的逻辑,而该方法里面首先先设置当前线程,然后执行SingleThreadEvent的run方法,run方法的具体实现由子类实现。
4.3.3 关于代码@2,关于schedule方法详解
整个调度任务的执行过程如下:先将任务封装成ScheduledFutureTask,然后如果当前线程是当前执行器线程,则直接加入到优先级队列中,如果不是,则调用execute方法,由执行器线程加入到调度任务的优先级队列中。这里是Netty线程模型的核心所在,通道的相关IO操作等最终都有由IO线程放入到队列并执行之。避免了多线程的竞争。
5、源码分析NioEventLoop
5.1 NioEventLoop重要属性详解
5.2 核心入口方法 run
继上文的分析,一个任务提交到EventExecutor,首先会先确认是否开始执行(startExecution),在启动调度之前,会运行具体的线程调度处理逻辑run方法里的逻辑。NioEventLoop的间接父类为SingleThreadEventExecutor。
代码@1,获取当前的wakeup状态并充值为false
代码@2,如果任务队列中有任务,则执行一次快速selectNow操作,该方法不阻塞。
代码@3,如果任务队列中,没有任务,则执行select方法。select方法,不会阻塞,因为调用的是selectNow或select(超时时间)
代码@4,如果需要weakup,则调用selector的weakup()方法。
代码@5,6是处理具体的任务相关逻辑。相关方法在后文详细讲解。(根据ioRatio的值不同,处理的逻辑不同)
代码@6、关闭流程
代码@7、这里重新向EventExecutor提交任务,再次开始执行select方法
这里的设计,还得琢磨一下,为什么不直接在一个线程中发送执行。
先重点研究一下代码@3处select(wakeup)方法
5.2.1 select(wakeup)方法详解
代码@1,计算select方法应该传入的超时时间,方法主要是从优先级队列(调度队列)中,取第一个节点,计算该任务在多久后应该被调度。
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
代码@2,执行带超时时间的select方法。
代码@3,如果本次有选择出感兴趣的键、或有调度任务处理,则跳出,去执行相关的操作。
代码@4,此处的判断主要是判断是否是空轮询,由于select是带超时时间的,如果没有超过其超时时间,就返回并且没有选择到键,则认为发生了空轮训,然后执行@5的逻辑,如果连续发生空轮询,超过SELECTOR_AUTO_REBUILD_THRESHOLD的值(默认512)次的话,执行重建Selector操作,也就是@6 rebuildSelector()方法执行逻辑。
5.2.2 rebuildSelector 方法详解
该方法的实现思路如下:
首先,调用者必须是EventExecutor的当前线程,然后就是新建一个Selector,然后将原来注册在Selector的通道,事件重新注册到新的Selector( selector.keys()),并取消在原Selector上的事件(取消操作非常重要,因为如果不取消,Selector关闭后,注册在Selector上的通道都将关闭)然后关闭旧的Selector以释放相关资源。上述代码就不一一详细解读了。
5.2.3 关于run方法 @52,@62 processSelectedKeys()方法详解
代码@1,如果key不可用,直接将通道关闭。
代码@2,获取该key已准备就绪的操作事件。
代码@3,判断是否有读事件就绪,如果就绪,则直接调用该Channel的unsafe对象的read操作。如果通道已经关闭,则返回。
代码@4,如果有写事件就绪,调用通道 ch.unsafe().forceFlush()方法,强制进行写操作。
代码@5,如果是连接操作,则将该键的感兴趣的连接事件取消,然后调用finishConnect()实现。
5.2.4 关于run方法 @53,@63 runAllTask详解
上述代码就是运行积压在队列中的任务,在获取任务时没有使用队列的阻塞方法,故这些方法最终不会阻塞。
从这里可以更加具体说一下ioRatio这个参数的含义。io执行时间比例,取值为1到100,Netty把处理Selector.selecteKeys的时间,也就是processSelectedKey()方法执行的时间做执行时间的基数,标记为t1时间,从上文的processSelectedKey讲解,其实可以得出t1的时间就绪事件的网络读写时间。另一部分时间是运行任务队列中的时间。如果ioRatio为100的话,表示processSelectedKey,runAllTask时间依次执行,但如果ioRatio设置小于100的话,runAllTask的运行时间将减少(权重),网络事件(select)就绪事件更加容易得到执行,那我们不妨思考一下任务队列中的task是从如何被加入的?一般也是一些读写事件,但不是由IO线程(EventLoop触发的),而是在业务线程中调用网络读写相关API,此时会先加入队列,然后再被调度执行。但这两部分,其实都是IO时间,所以对于ioRatio这个参数,我认为是
运行任务队列的权重更为直观。
总结,本文深入细致的分析了Netty NIO的事件模型,从事件模型的初始化(构造的全过程)、核心属性、构造方法、核心入口方法等方面细致分析了NioEventLoopGroup、SingleThreadEventExecutor的原理。
Netty事件模型:核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop,每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty
ForkJoinPool,并发度为n),在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程,这里的设计,其实我不太明白为什么要这样,一直在一个线程中执行我个人觉得更好。
问题:
不知道netty为什么要这样设计,,在NioEventLoop中,内部会持有一个EventExecutor(ForkjoinPool,并发度为nEventLoops),我开始以为,一个NioEventLoop,只会用到ForkJoinPool中的一个线程,除非那个线程异常退出了后,才会用一个新的线程来提供服务,这样保证健壮性),,但我仔细看源码时,我发现,每一次select后,会重新进行一次scheduleExecution方法调用,这样会使用ForkJoinPool中的另外一个线程,,这样的设计,不利于ThreadLocal的使用,特别是线程本地内存分配,比如同一个通道的读操作,从线程本地变量中分配一个ByteBuf,然后写操作又会用ForkJoinPool的第二个线程,这样的ByteBuf又不会重复使用,为什么会这样设计呢?为什么在EventExecutor的线程池(ForkJoinPool),只使用一个线程,只有当这个线程奔溃后,再切换另一个线程进行处理(线程池自动处理)
Netty内存模型基于主从Reactor模型;Channel会绑定一个线程模型(EventLoopGroup),与该通道的读,写等事件都在一个EventLoopGroup中执行,避免了Handler执行的线程安全问题。
线程模型前置篇:
Nio实现Reactor模式
图说netty线程模型
2、源码分析NioEventLoopGroup初始化流程
2.1 NioEventLoopGroup构造方法
/** * Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores * available, as well as the default {@link Executor} and the {@link SelectorProvider} which * is returned by {@link SelectorProvider#provider()}. * * @see io.netty.util.concurrent.DefaultExecutorServiceFactory */ public NioEventLoopGroup() { this(0); } /** * Create a new instance that uses the default {@link Executor} and the {@link SelectorProvider} which * is returned by {@link SelectorProvider#provider()}. * * @see io.netty.util.concurrent.DefaultExecutorServiceFactory * * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. * If {@code executor} is {@code null} this number will also be the parallelism * requested from the default executor. It is generally advised for the number * of {@link EventLoop}s and the number of {@link Thread}s used by the */ public NioEventLoopGroup(int nEventLoops) { this(nEventLoops, (Executor) null); } /** * Create a new instance that uses the the {@link SelectorProvider} which is returned by * {@link SelectorProvider#provider()}. * * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. * If {@code executor} is {@code null} this number will also be the parallelism * requested from the default executor. It is generally advised for the number * of {@link EventLoop}s and the number of {@link Thread}s used by the * {@code executor} to lie very close together. * @param executor the {@link Executor} to use, or {@code null} if the default should be used. */ public NioEventLoopGroup(int nEventLoops, Executor executor) { this(nEventLoops, executor, SelectorProvider.provider()); } /** * Create a new instance that uses the the {@link SelectorProvider} which is returned by * {@link SelectorProvider#provider()}. * * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. * If {@code executor} is {@code null} this number will also be the parallelism * 1a6ab requested from the default executor. It is generally advised for the number * of {@link EventLoop}s and the number of {@link Thread}s used by the * {@code executor} to lie very close together. * @param executorServiceFactory the {@link ExecutorServiceFactory} to use, or {@code null} if the default * should be used. */ public NioEventLoopGroup(int nEventLoops, ExecutorServiceFactory executorServiceFactory) { this(nEventLoops, executorServiceFactory, SelectorProvider.provider()); } /** * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. * If {@code executor} is {@code null} this number will also be the parallelism * requested from the default executor. It is generally advised for the number * of {@link EventLoop}s and the number of {@link Thread}s used by the * {@code executor} to lie very close together. * @param executor the {@link Executor} to use, or {@code null} if the default should be used. * @param selectorProvider the {@link SelectorProvider} to use. This value must not be {@code null}. */ public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) { super(nEventLoops, executor, selectorProvider); } /** * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. * If {@code executor} is {@code null} this number will also be the parallelism * requested from the default executor. It is generally advised for the number * of {@link EventLoop}s and the number of {@link Thread}s used by the * {@code executor} to lie very close together. * @param executorServiceFactory the {@link ExecutorServiceFactory} to use, or {@code null} if the * default should be used. * @param selectorProvider the {@link SelectorProvider} to use. This value must not be {@code null}. */ public NioEventLoopGroup( int nEventLoops, ExecutorServiceFactory executorServiceFactory, final SelectorProvider selectorProvider) { super(nEventLoops, executorServiceFactory, selectorProvider); }
重点关注构造方法如下参数:
1、int nEventLoops EventLoop个数
2、Executor executor 任务执行器
3、SelectorProvider selectorProvider Nio Selector
并且在使用 EventLoopGroup boosGroup = new EventLoopGroup();时,在未调用其父类构造时,nEventLoops为0, executor为null,selectorProvider为特定平台下的Selector实现类。
2.2 进入到直接父类MultithreadEventLoopGroup构造方法
/** * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)} */ protected MultithreadEventLoopGroup(int nEventLoops, Executor executor, Object... args) { super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executor, args); }
该构造方法,只是初始化nEventLoops参数,如果为0,则使用CPU可用核心数的2倍。
2.3 进入到父类MultithreadEventExecutorGroup中,这里是具体初始化的地方
private MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, boolean shutdownExecutor, Object... args) { if (nEventExecutors <= 0) { throw new IllegalArgumentException( String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors)); } if (executor == null) { executor = newDefaultExecutorService(nEventExecutors); // @1 shutdownExecutor = true; } children = new EventExecutor[nEventExecutors]; //@2 if (isPowerOfTwo(children.length)) { //@3 chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nEventExecutors; i ++) { boolean success = false; try { children[i] = newChild(executor, args); //@4 success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { //@5 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } final boolean shutdownExecutor0 = shutdownExecutor; final Executor executor0 = executor; final FutureListener<Object> terminationListener = new FutureListener<Object>() { //@6 @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); if (shutdownExecutor0) { // This cast is correct because shutdownExecutor0 is only try if // executor0 is of type ExecutorService. ((ExecutorService) executor0).shutdown(); } } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
代码@1、创建该线程模型的线程执行器,此处返回的是io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors。
代码@2、开始创建该组的工作EventExecutor数组,其中真正存放的实例由代码@4中创建。
代码@3、创建从线程组(EventExecutor)执行器中选择一个执行。
代码@4、创建一个具体的执行器、有具体的线程模型(EventLoopGroup)子类实现。这里是下一步要重点关注的对象,NioEventLoopGroup实例化的对象为NioEventLoop:
代码@5、如果创建执行器失败,则关闭资源。
代码@6、添加相关事件通知处理器。
执行到这里,NioEventLoopGroup的初始化完成,目前可以得出如下
1个NioEventLoopGroup可以有多个 NioEventLoop,轮流接受客户端请求;
同一个NioEventLoopGroup中的NioEventLoop共同持有一个Executor,这个Executor是何许人也(io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors)。接下来将重点探究NioEventLoop的类层次结构与其实现。
3、NioEventLoop初探k
NioEventLoop继承自SingleThreadEventLoop,从字面上讲,NioEventLoopGroup的每一个NioEventLoop是一个线程的事件循环器,而每一个NioEventLoop中的执行器(EventExecutor)是一个并发度为nEventLoops的ForkJoinPool。
NioEventLoop的构造方法如下:
这里大家是否想过一个问题,为什么单个线程的EventLoop(SingleThreadEventLoop)的事件循环器,里面需要用一个线程池呢?这里用单个线程当事件循环器有什么作用呢?。我目前的理解是,事件循环器(EventLoop)其实就是一个IO线程,首先使用单个线程来实现,简单高效,没有线程切换开销,多线程访问等问题;并且Netty将一个通道的读、写等操作都绑定到一个相同的事件循环器,这样有利于状态的保存,比如说可以比较方便的在Handler(Handler在IO线程中执行)使用线程本地变量(ThreadLocal)、同时减少线程切换。而使用一个线程池,而不是一个简单的线程,主要是为了提高程序的健壮性,如果单一线程由于异常,导致该线程消亡后,线程池会另起一个新的线程继续提供服务。[但是,从后面的分析看,NioEventLoop中的线程也是轮流执行的。]
接下来,将从SingleThreadEventExecutor,整个线程模型的执行者开始相信接口执行器内部运作逻辑。
4、SingleThreadEventExecutor源码分析
SingleThreadEventExecutor是NioEventLoopGroup的具体执行器,也就是NioEventLoopGroup中运行的线程,其实就是SingleThreadEventExecutor。本文将从重点属性、构造方法、核心方法三方面剖析该类的实现。
4.1 重要属性
private static final int ST_NOT_STARTED = 1; //状态,,,未启动,未启动接受任务 private static final int ST_STARTED = 2; // 已启动,运行 private static final int ST_SHUTTING_DOWN = 3; //关闭中(平滑关闭) private static final int ST_SHUTDOWN = 4; //已关闭 private static final int ST_TERMINATED = 5; // 终止 private final Queue<Runnable> taskQueue; // 该线程模型执行器 任务队列,子类可定制自己的任务队列 @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile Thread thread; // 当前执行器运行的线程。 private final Executor executor; // 具体的线程池,此处为 Netty实现的ForkJoinPool。 private final Semaphore threadLock = new Semaphore(0); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final boolean addTaskWakesUp; //如果设置为true, 当且仅当 添加 一个任务时,才唤醒选择器,比如是否唤醒 select() 函数。 @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile int state = ST_NOT_STARTED; // 当前的状态4.2 构造方法解读
/** * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it. * @param executor the {@link Executor} which will be used for executing. * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up * the executor thread. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent); if (executor == null) { throw new NullPointerException("executor"); } this.addTaskWakesUp = addTaskWakesUp; this.executor = executor; taskQueue = newTaskQueue(); } /** * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant * implementation that does not support blocking operations at all. */ protected Queue<Runnable> newTaskQueue() { return new LinkedBlockingQueue<Runnable>(); }
子类可以通过重写newTaskQueue方法,重写底层的任务队列。
4.3 核心方法
jdk并发包中线程池的实现,比如submit等方法,都是先包装成相关Task,然后调用execute方法。SingleThreadEventExecutor类,最终父类就是Executor,所以,我们从executor方法开始研究。
4.3.1 executor 方法
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); //@1 if (inEventLoop) { addTask(task); //@2 } else { startExecution(); //@3 addTask(task); //@4 if (isShutdown() && removeTask(task)) { //@5 reject(); //@6 } } if (!addTaskWakesUp && wakesUpForTask(task)) { //@7 wakeup(inEventLoop); //@8 } }
代码@1,是否在事件循环中,具体实现如下:
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
怎么解释呢?提交任务的操作,比如需要提交一个读任务,或写任务,如果调用的API的线程是当前的EventLoop,则直接加入到任务队列中等待执行,如果是其他线程调用的,则启动该EventLoop线程进行调度,然后放入到任务队列中。举个简单的例子,在IO线程中(解码出请求信息后),将任务放入到业务队列中去处理的时候,如果调用channel.write方法,此时要注意的是,真实的writer方法的调用,不会在业务线程中调用,因为次数该业务线程并不是EventLoop的执行线程,只会将任务放入到队列,然后业务线程中直接返回(理解这里的异步操作)。
代码@2,如果是EventLoop执行的任务,直接加入任务队列
代码@3,如果是其他线程,则启动调度,稍后进行详细的代码分析。
代码@5,代码@6,如果停止执行,则拒绝服务。
代码@6,7,唤醒选择器,感觉这里addTaskWakesUp 这个变量有点问题,不过具体的唤醒选择器逻辑是对于事件模型来说比较重要,在相关的子类中都有重写。故这部分在研究NioEventLoop时再重点关注。
接下来重点研究步骤3,startExecution方法的实现
4.3.2 startExecution方法详解
private void startExecution() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // @1 schedule(new ScheduledFutureTask<Void>( this, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); // @2 scheduleExecution(); //@3 } } } protected final void scheduleExecution() { updateThread(null); executor.execute(asRunnable); // @4 } private void updateThread(Thread t) { THREAD_UPDATER.lazySet(this, t); //@5 } private final Runnable asRunnable = new Runnable() { //@6 @Override public void run() { updateThread(Thread.currentThread()); // lastExecutionTime must be set on the first run // in order for shutdown to work correctly for the // rare case that the eventloop did not execute // a single task during its lifetime. if (firstRun) { firstRun = false; updateLastExecutionTime(); } try { SingleThreadEventExecutor.this.run(); } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); cleanupAndTerminate(false); } } };
代码@1,该startExecution方法支持多次调用,因为对状态进行了CAS检测并设置,如果启动状态为未启动,并设置为启动中成功,则继续下文的启动流程。
代码@2,新建一个任务,该任务中取消任务队列中的任务,并移除。具体代码下文会重点分析。
代码@3,启动调度器,此处先将当前运行的线程设置为空,代码@5,可以使用lazySet来更新的原因是,对其可见性要求没那么高,因为在添加一个任务的时候,就算检测到当前线程不是EventLoop线程,也就是asRunnable线程,也没关系,会先调用startExecution方法,等其asRunable运行,然后再放入队列中。
代码@4,核心所在呀,这里让SingleThreadEventExecutor真正的名副其实是单个线程。尽管每个EventLoop的执行器是一个并发度为nEventCount的ForkJoinPool线程池。
代码@6,在ForkJoinPool中执行的任务,就是asRunnable中run方法的逻辑,而该方法里面首先先设置当前线程,然后执行SingleThreadEvent的run方法,run方法的具体实现由子类实现。
4.3.3 关于代码@2,关于schedule方法详解
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
整个调度任务的执行过程如下:先将任务封装成ScheduledFutureTask,然后如果当前线程是当前执行器线程,则直接加入到优先级队列中,如果不是,则调用execute方法,由执行器线程加入到调度任务的优先级队列中。这里是Netty线程模型的核心所在,通道的相关IO操作等最终都有由IO线程放入到队列并执行之。避免了多线程的竞争。
5、源码分析NioEventLoop
5.1 NioEventLoop重要属性详解
private static final int CLEANUP_INTERVAL = 256; //取消键的个数超过改造,将cancelKeys清空为0,并执行一次重新选择 private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); // 是否启用选择键优化(SelectionKeys),默认为true private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3; // 众所周知,jdk在1.7之前的Selector的select方法会出现空轮询,导致CPU资源紧张,解决 //空轮询最小的空轮训次数,如果SELECTOR_AUTO_REBUILD_THRESHOLD小于该值,则不触发Selector的重建工作。 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD; //如果出现轮询(select),连续多少次未返回准备的键,则触发Selector重建。默认为512 /** * The NIO {@link Selector}. */ Selector selector; // NIO Selector private SelectedSelectionKeySet selectedKeys; // 可选择的键,netty对原生Selector的select()方法返回的键的一个优化集合 private final SelectorProvider provider; /** * Boolean that controls determines if a blocked Selector.select should * break out of its selection process. In our case we use a timeout for * the select method and the select method will block for that time unless * waken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); //是否需要执行 selector的wakenup()方法 private volatile int ioRatio = 50; // ioRatio执行比例 private int cancelledKeys; //取消键的个数 private boolean needsToSelectAgain; //是否需要重新select
5.2 核心入口方法 run
继上文的分析,一个任务提交到EventExecutor,首先会先确认是否开始执行(startExecution),在启动调度之前,会运行具体的线程调度处理逻辑run方法里的逻辑。NioEventLoop的间接父类为SingleThreadEventExecutor。
@Override protected void run() { boolean oldWakenUp = wakenUp.getAndSet(false); //@1 try { if (hasTasks()) { // @2 selectNow(); } else { select(oldWakenUp); //@3 // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { // @4 selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { // @51 processSelectedKeys(); //@52 runAllTasks(); //53 } else { //@61 final long ioStartTime = System.nanoTime(); processSelectedKeys();//@62 final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //@63 } if (isShuttingDown()) { //@7 closeAll(); if (confirmShutdown()) { cleanupAndTerminate(true); return; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // TODO: After using a ForkJoinPool that is potentially shared with other software // than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever // happen anyways. // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } scheduleExecution(); //@7 }
代码@1,获取当前的wakeup状态并充值为false
代码@2,如果任务队列中有任务,则执行一次快速selectNow操作,该方法不阻塞。
代码@3,如果任务队列中,没有任务,则执行select方法。select方法,不会阻塞,因为调用的是selectNow或select(超时时间)
代码@4,如果需要weakup,则调用selector的weakup()方法。
代码@5,6是处理具体的任务相关逻辑。相关方法在后文详细讲解。(根据ioRatio的值不同,处理的逻辑不同)
代码@6、关闭流程
代码@7、这里重新向EventExecutor提交任务,再次开始执行select方法
这里的设计,还得琢磨一下,为什么不直接在一个线程中发送执行。
先重点研究一下代码@3处select(wakeup)方法
5.2.1 select(wakeup)方法详解
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // @1 for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); // @2 selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // @3 // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // @4 // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //@5 // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); //@6 selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } }
代码@1,计算select方法应该传入的超时时间,方法主要是从优先级队列(调度队列)中,取第一个节点,计算该任务在多久后应该被调度。
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
代码@2,执行带超时时间的select方法。
代码@3,如果本次有选择出感兴趣的键、或有调度任务处理,则跳出,去执行相关的操作。
代码@4,此处的判断主要是判断是否是空轮询,由于select是带超时时间的,如果没有超过其超时时间,就返回并且没有选择到键,则认为发生了空轮训,然后执行@5的逻辑,如果连续发生空轮询,超过SELECTOR_AUTO_REBUILD_THRESHOLD的值(默认512)次的话,执行重建Selector操作,也就是@6 rebuildSelector()方法执行逻辑。
5.2.2 rebuildSelector 方法详解
/** * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. */ public void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (;;) { try { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } break; } selector = newSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }
该方法的实现思路如下:
首先,调用者必须是EventExecutor的当前线程,然后就是新建一个Selector,然后将原来注册在Selector的通道,事件重新注册到新的Selector( selector.keys()),并取消在原Selector上的事件(取消操作非常重要,因为如果不取消,Selector关闭后,注册在Selector上的通道都将关闭)然后关闭旧的Selector以释放相关资源。上述代码就不一一详细解读了。
5.2.3 关于run方法 @52,@62 processSelectedKeys()方法详解
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { //@1 processSelectedKey(k, (AbstractNioChannel) a); } else { //@2 @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { //@3 selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }该方法,主要就是遍历选择键,真正对键的处理在代码@1,代码@2中。主要看一下代码@1的处理逻辑吧。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { //@1 // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); //@2 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //@3 unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { //@4 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //@5 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
代码@1,如果key不可用,直接将通道关闭。
代码@2,获取该key已准备就绪的操作事件。
代码@3,判断是否有读事件就绪,如果就绪,则直接调用该Channel的unsafe对象的read操作。如果通道已经关闭,则返回。
代码@4,如果有写事件就绪,调用通道 ch.unsafe().forceFlush()方法,强制进行写操作。
代码@5,如果是连接操作,则将该键的感兴趣的连接事件取消,然后调用finishConnect()实现。
5.2.4 关于run方法 @53,@63 runAllTask详解
/** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. * * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } } /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. */ protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; } private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }
上述代码就是运行积压在队列中的任务,在获取任务时没有使用队列的阻塞方法,故这些方法最终不会阻塞。
从这里可以更加具体说一下ioRatio这个参数的含义。io执行时间比例,取值为1到100,Netty把处理Selector.selecteKeys的时间,也就是processSelectedKey()方法执行的时间做执行时间的基数,标记为t1时间,从上文的processSelectedKey讲解,其实可以得出t1的时间就绪事件的网络读写时间。另一部分时间是运行任务队列中的时间。如果ioRatio为100的话,表示processSelectedKey,runAllTask时间依次执行,但如果ioRatio设置小于100的话,runAllTask的运行时间将减少(权重),网络事件(select)就绪事件更加容易得到执行,那我们不妨思考一下任务队列中的task是从如何被加入的?一般也是一些读写事件,但不是由IO线程(EventLoop触发的),而是在业务线程中调用网络读写相关API,此时会先加入队列,然后再被调度执行。但这两部分,其实都是IO时间,所以对于ioRatio这个参数,我认为是
运行任务队列的权重更为直观。
总结,本文深入细致的分析了Netty NIO的事件模型,从事件模型的初始化(构造的全过程)、核心属性、构造方法、核心入口方法等方面细致分析了NioEventLoopGroup、SingleThreadEventExecutor的原理。
Netty事件模型:核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop,每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty
ForkJoinPool,并发度为n),在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程,这里的设计,其实我不太明白为什么要这样,一直在一个线程中执行我个人觉得更好。
问题:
不知道netty为什么要这样设计,,在NioEventLoop中,内部会持有一个EventExecutor(ForkjoinPool,并发度为nEventLoops),我开始以为,一个NioEventLoop,只会用到ForkJoinPool中的一个线程,除非那个线程异常退出了后,才会用一个新的线程来提供服务,这样保证健壮性),,但我仔细看源码时,我发现,每一次select后,会重新进行一次scheduleExecution方法调用,这样会使用ForkJoinPool中的另外一个线程,,这样的设计,不利于ThreadLocal的使用,特别是线程本地内存分配,比如同一个通道的读操作,从线程本地变量中分配一个ByteBuf,然后写操作又会用ForkJoinPool的第二个线程,这样的ByteBuf又不会重复使用,为什么会这样设计呢?为什么在EventExecutor的线程池(ForkJoinPool),只使用一个线程,只有当这个线程奔溃后,再切换另一个线程进行处理(线程池自动处理)
相关文章推荐
- Netty线程模型源码分析之NioEventLoopGroup的初始化
- Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析
- Netty源码学习(二)NioEventLoopGroup
- 【Netty4.X】Netty源码分析之NioEventLoopGroup(五)
- Netty源码分析:NioEventLoopGroup
- netty5.0之SingleThreadEventLoop & NioEventLoop
- Netty源码学习(三)NioEventLoop
- 【Netty源码学习】EventLoopGroup
- [netty源码分析]--EventLoopGroup与EventLoop 分析netty的线程模型
- 【Netty4.X】Netty源码分析之NioEventLoop(六)
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- Netty源代码学习——EventLoopGroup原理:NioEventLoopGroup分析
- Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理
- netty NioEventLoopGroup 线程名称设置
- Netty 权威指南笔记(八):EventLoopGroup 和线程模型
- netty 启动分析 NioEventLoopGroup
- 【Netty源码学习】EventLoopGroup
- netty源码分析(一)EventLoopGroup
- netty源码学习二(EventLoopGroup、EventLoop)