您的位置:首页 > 运维架构

Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor

2017-03-21 16:29 1516 查看
1、内存模型总结
     Netty内存模型基于主从Reactor模型;Channel会绑定一个线程模型(EventLoopGroup),与该通道的读,写等事件都在一个EventLoopGroup中执行,避免了Handler执行的线程安全问题。
     线程模型前置篇:
          Nio实现Reactor模式
          图说netty线程模型

2、源码分析NioEventLoopGroup初始化流程
2.1 NioEventLoopGroup构造方法
/**
* Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores
* available, as well as the default {@link Executor} and the {@link SelectorProvider} which
* is returned by {@link SelectorProvider#provider()}.
*
* @see io.netty.util.concurrent.DefaultExecutorServiceFactory
*/
public NioEventLoopGroup() {
this(0);
}

/**
* Create a new instance that uses the default {@link Executor} and the {@link SelectorProvider} which
* is returned by {@link SelectorProvider#provider()}.
*
* @see io.netty.util.concurrent.DefaultExecutorServiceFactory
*
* @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
*                      If {@code executor} is {@code null} this number will also be the parallelism
*                      requested from the default executor. It is generally advised for the number
*                      of {@link EventLoop}s and the number of {@link Thread}s used by the
*/
public NioEventLoopGroup(int nEventLoops) {
this(nEventLoops, (Executor) null);
}

/**
* Create a new instance that uses the the {@link SelectorProvider} which is returned by
* {@link SelectorProvider#provider()}.
*
* @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
*                      If {@code executor} is {@code null} this number will also be the parallelism
*                      requested from the default executor. It is generally advised for the number
*                      of {@link EventLoop}s and the number of {@link Thread}s used by the
*                      {@code executor} to lie very close together.
* @param executor   the {@link Executor} to use, or {@code null} if the default should be used.
*/
public NioEventLoopGroup(int nEventLoops, Executor executor) {
this(nEventLoops, executor, SelectorProvider.provider());
}

/**
* Create a new instance that uses the the {@link SelectorProvider} which is returned by
* {@link SelectorProvider#provider()}.
*
* @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
*                      If {@code executor} is {@code null} this number will also be the parallelism
*
1a6ab
requested from the default executor. It is generally advised for the number
*                      of {@link EventLoop}s and the number of {@link Thread}s used by the
*                      {@code executor} to lie very close together.
* @param executorServiceFactory   the {@link ExecutorServiceFactory} to use, or {@code null} if the default
*                                 should be used.
*/
public NioEventLoopGroup(int nEventLoops, ExecutorServiceFactory executorServiceFactory) {
this(nEventLoops, executorServiceFactory, SelectorProvider.provider());
}

/**
* @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
*                      If {@code executor} is {@code null} this number will also be the parallelism
*                      requested from the default executor. It is generally advised for the number
*                      of {@link EventLoop}s and the number of {@link Thread}s used by the
*                      {@code executor} to lie very close together.
* @param executor  the {@link Executor} to use, or {@code null} if the default should be used.
* @param selectorProvider  the {@link SelectorProvider} to use. This value must not be {@code null}.
*/
public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
super(nEventLoops, executor, selectorProvider);
}

/**
* @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
*                      If {@code executor} is {@code null} this number will also be the parallelism
*                      requested from the default executor. It is generally advised for the number
*                      of {@link EventLoop}s and the number of {@link Thread}s used by the
*                      {@code executor} to lie very close together.
* @param executorServiceFactory   the {@link ExecutorServiceFactory} to use, or {@code null} if the
*                                 default should be used.
* @param selectorProvider  the {@link SelectorProvider} to use. This value must not be {@code null}.
*/
public NioEventLoopGroup(
int nEventLoops, ExecutorServiceFactory executorServiceFactory, final SelectorProvider selectorProvider) {
super(nEventLoops, executorServiceFactory, selectorProvider);
}

重点关注构造方法如下参数:
    1、int nEventLoops                                   EventLoop个数
    2、Executor executor                                任务执行器
    3、SelectorProvider selectorProvider       Nio Selector
并且在使用 EventLoopGroup boosGroup = new EventLoopGroup();时,在未调用其父类构造时,nEventLoops为0, executor为null,selectorProvider为特定平台下的Selector实现类。

2.2 进入到直接父类MultithreadEventLoopGroup构造方法
/**
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
*/
protected MultithreadEventLoopGroup(int nEventLoops, Executor executor, Object... args) {
super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executor, args);
}


该构造方法,只是初始化nEventLoops参数,如果为0,则使用CPU可用核心数的2倍。

2.3 进入到父类MultithreadEventExecutorGroup中,这里是具体初始化的地方
private MultithreadEventExecutorGroup(int nEventExecutors,
Executor executor,
boolean shutdownExecutor,
Object... args) {
if (nEventExecutors <= 0) {
throw new IllegalArgumentException(
String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));
}

if (executor == null) {
executor = newDefaultExecutorService(nEventExecutors); // @1
shutdownExecutor = true;
}

children = new EventExecutor[nEventExecutors];                //@2
if (isPowerOfTwo(children.length)) {                                     //@3
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}

for (int i = 0; i < nEventExecutors; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);                  //@4
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {                                       //@5
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

final boolean shutdownExecutor0 = shutdownExecutor;
final Executor executor0 = executor;
final FutureListener<Object> terminationListener = new FutureListener<Object>() {      //@6
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
if (shutdownExecutor0) {
// This cast is correct because shutdownExecutor0 is only try if
// executor0 is of type ExecutorService.
((ExecutorService) executor0).shutdown();
}
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

代码@1、创建该线程模型的线程执行器,此处返回的是io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors。
代码@2、开始创建该组的工作EventExecutor数组,其中真正存放的实例由代码@4中创建。
代码@3、创建从线程组(EventExecutor)执行器中选择一个执行。
代码@4、创建一个具体的执行器、有具体的线程模型(EventLoopGroup)子类实现。这里是下一步要重点关注的对象,NioEventLoopGroup实例化的对象为NioEventLoop:



代码@5、如果创建执行器失败,则关闭资源。
代码@6、添加相关事件通知处理器。
执行到这里,NioEventLoopGroup的初始化完成,目前可以得出如下
1个NioEventLoopGroup可以有多个 NioEventLoop,轮流接受客户端请求;
同一个NioEventLoopGroup中的NioEventLoop共同持有一个Executor,这个Executor是何许人也(io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors)。接下来将重点探究NioEventLoop的类层次结构与其实现。

3、NioEventLoop初探k
NioEventLoop继承自SingleThreadEventLoop,从字面上讲,NioEventLoopGroup的每一个NioEventLoop是一个线程的事件循环器,而每一个NioEventLoop中的执行器(EventExecutor)是一个并发度为nEventLoops的ForkJoinPool。

NioEventLoop的构造方法如下:



这里大家是否想过一个问题,为什么单个线程的EventLoop(SingleThreadEventLoop)的事件循环器,里面需要用一个线程池呢?这里用单个线程当事件循环器有什么作用呢?。我目前的理解是,事件循环器(EventLoop)其实就是一个IO线程,首先使用单个线程来实现,简单高效,没有线程切换开销,多线程访问等问题;并且Netty将一个通道的读、写等操作都绑定到一个相同的事件循环器,这样有利于状态的保存,比如说可以比较方便的在Handler(Handler在IO线程中执行)使用线程本地变量(ThreadLocal)、同时减少线程切换。而使用一个线程池,而不是一个简单的线程,主要是为了提高程序的健壮性,如果单一线程由于异常,导致该线程消亡后,线程池会另起一个新的线程继续提供服务。[但是,从后面的分析看,NioEventLoop中的线程也是轮流执行的。]
接下来,将从SingleThreadEventExecutor,整个线程模型的执行者开始相信接口执行器内部运作逻辑。

4、SingleThreadEventExecutor源码分析
SingleThreadEventExecutor是NioEventLoopGroup的具体执行器,也就是NioEventLoopGroup中运行的线程,其实就是SingleThreadEventExecutor。本文将从重点属性、构造方法、核心方法三方面剖析该类的实现。
4.1 重要属性
private static final int ST_NOT_STARTED = 1;        //状态,,,未启动,未启动接受任务
private static final int ST_STARTED = 2;                 //   已启动,运行
private static final int ST_SHUTTING_DOWN = 3; //关闭中(平滑关闭)
private static final int ST_SHUTDOWN = 4;       //已关闭
private static final int ST_TERMINATED = 5;    // 终止

private final Queue<Runnable> taskQueue;   // 该线程模型执行器 任务队列,子类可定制自己的任务队列
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile Thread thread;                         // 当前执行器运行的线程。
private final Executor executor;                       // 具体的线程池,此处为 Netty实现的ForkJoinPool。
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;      //如果设置为true, 当且仅当 添加              一个任务时,才唤醒选择器,比如是否唤醒 select() 函数。
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;     // 当前的状态
4.2 构造方法解读
/**
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it.
* @param executor the {@link Executor} which will be used for executing.
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up
* the executor thread.
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean                                                      addTaskWakesUp) {
super(parent);
if (executor == null) {
throw new NullPointerException("executor");
}
this.addTaskWakesUp = addTaskWakesUp;
this.executor = executor;
taskQueue = newTaskQueue();
}
/**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all.
*/
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue<Runnable>();
}

子类可以通过重写newTaskQueue方法,重写底层的任务队列。
4.3 核心方法
jdk并发包中线程池的实现,比如submit等方法,都是先包装成相关Task,然后调用execute方法。SingleThreadEventExecutor类,最终父类就是Executor,所以,我们从executor方法开始研究。
4.3.1 executor 方法
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();  //@1
if (inEventLoop) {
addTask(task);                                     //@2
} else {
startExecution();                                 //@3
addTask(task);                                   //@4
if (isShutdown() && removeTask(task)) {    //@5
reject();                                                       //@6
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {    //@7
wakeup(inEventLoop);                                               //@8
}
}

代码@1,是否在事件循环中,具体实现如下:
@Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
怎么解释呢?提交任务的操作,比如需要提交一个读任务,或写任务,如果调用的API的线程是当前的EventLoop,则直接加入到任务队列中等待执行,如果是其他线程调用的,则启动该EventLoop线程进行调度,然后放入到任务队列中。举个简单的例子,在IO线程中(解码出请求信息后),将任务放入到业务队列中去处理的时候,如果调用channel.write方法,此时要注意的是,真实的writer方法的调用,不会在业务线程中调用,因为次数该业务线程并不是EventLoop的执行线程,只会将任务放入到队列,然后业务线程中直接返回(理解这里的异步操作)。
代码@2,如果是EventLoop执行的任务,直接加入任务队列
代码@3,如果是其他线程,则启动调度,稍后进行详细的代码分析。
代码@5,代码@6,如果停止执行,则拒绝服务。
代码@6,7,唤醒选择器,感觉这里addTaskWakesUp 这个变量有点问题,不过具体的唤醒选择器逻辑是对于事件模型来说比较重要,在相关的子类中都有重写。故这部分在研究NioEventLoop时再重点关注。
接下来重点研究步骤3,startExecution方法的实现

4.3.2 startExecution方法详解
private void startExecution() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // @1
schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));   // @2
scheduleExecution();   //@3
}
}
}

protected final void scheduleExecution() {
updateThread(null);
executor.execute(asRunnable); // @4
}

private void updateThread(Thread t) {
THREAD_UPDATER.lazySet(this, t);     //@5
}

private final Runnable asRunnable = new Runnable() {  //@6
@Override
public void run() {
updateThread(Thread.currentThread());
// lastExecutionTime must be set on the first run
// in order for shutdown to work correctly for the
// rare case that the eventloop did not execute
// a single task during its lifetime.
if (firstRun) {
firstRun = false;
updateLastExecutionTime();
}
try {
SingleThreadEventExecutor.this.run();
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
cleanupAndTerminate(false);
}
}
};

代码@1,该startExecution方法支持多次调用,因为对状态进行了CAS检测并设置,如果启动状态为未启动,并设置为启动中成功,则继续下文的启动流程。
代码@2,新建一个任务,该任务中取消任务队列中的任务,并移除。具体代码下文会重点分析。
代码@3,启动调度器,此处先将当前运行的线程设置为空,代码@5,可以使用lazySet来更新的原因是,对其可见性要求没那么高,因为在添加一个任务的时候,就算检测到当前线程不是EventLoop线程,也就是asRunnable线程,也没关系,会先调用startExecution方法,等其asRunable运行,然后再放入队列中。
代码@4,核心所在呀,这里让SingleThreadEventExecutor真正的名副其实是单个线程。尽管每个EventLoop的执行器是一个并发度为nEventCount的ForkJoinPool线程池。
代码@6,在ForkJoinPool中执行的任务,就是asRunnable中run方法的逻辑,而该方法里面首先先设置当前线程,然后执行SingleThreadEvent的run方法,run方法的具体实现由子类实现。
4.3.3 关于代码@2,关于schedule方法详解
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}

return task;
}

整个调度任务的执行过程如下:先将任务封装成ScheduledFutureTask,然后如果当前线程是当前执行器线程,则直接加入到优先级队列中,如果不是,则调用execute方法,由执行器线程加入到调度任务的优先级队列中。这里是Netty线程模型的核心所在,通道的相关IO操作等最终都有由IO线程放入到队列并执行之。避免了多线程的竞争。

5、源码分析NioEventLoop
5.1 NioEventLoop重要属性详解
private static final int CLEANUP_INTERVAL = 256;     //取消键的个数超过改造,将cancelKeys清空为0,并执行一次重新选择
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); // 是否启用选择键优化(SelectionKeys),默认为true

private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;   // 众所周知,jdk在1.7之前的Selector的select方法会出现空轮询,导致CPU资源紧张,解决
//空轮询最小的空轮训次数,如果SELECTOR_AUTO_REBUILD_THRESHOLD小于该值,则不触发Selector的重建工作。
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;  //如果出现轮询(select),连续多少次未返回准备的键,则触发Selector重建。默认为512
/**
* The NIO {@link Selector}.
*/
Selector selector;                // NIO Selector
private SelectedSelectionKeySet selectedKeys;      // 可选择的键,netty对原生Selector的select()方法返回的键的一个优化集合

private final SelectorProvider provider;

/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();    //是否需要执行 selector的wakenup()方法

private volatile int ioRatio = 50;   // ioRatio执行比例
private int cancelledKeys;            //取消键的个数
private boolean needsToSelectAgain; //是否需要重新select

5.2 核心入口方法 run
继上文的分析,一个任务提交到EventExecutor,首先会先确认是否开始执行(startExecution),在启动调度之前,会运行具体的线程调度处理逻辑run方法里的逻辑。NioEventLoop的间接父类为SingleThreadEventExecutor。
@Override
protected void run() {
boolean oldWakenUp = wakenUp.getAndSet(false);      //@1
try {
if (hasTasks()) {      // @2
selectNow();
} else {
select(oldWakenUp);    //@3

// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
//    'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
//    'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).

if (wakenUp.get()) {         // @4
selector.wakeup();
}
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {          // @51
processSelectedKeys();  //@52
runAllTasks();                //53
} else { //@61
final long ioStartTime = System.nanoTime();

processSelectedKeys();//@62

final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);    //@63
}

if (isShuttingDown()) {   //@7
closeAll();
if (confirmShutdown()) {
cleanupAndTerminate(true);
return;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);

// TODO: After using a ForkJoinPool that is potentially shared with other software
// than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever
// happen anyways.

// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}

scheduleExecution();     //@7
}

代码@1,获取当前的wakeup状态并充值为false
代码@2,如果任务队列中有任务,则执行一次快速selectNow操作,该方法不阻塞。
代码@3,如果任务队列中,没有任务,则执行select方法。select方法,不会阻塞,因为调用的是selectNow或select(超时时间)
代码@4,如果需要weakup,则调用selector的weakup()方法。
代码@5,6是处理具体的任务相关逻辑。相关方法在后文详细讲解。(根据ioRatio的值不同,处理的逻辑不同)
代码@6、关闭流程
代码@7、这里重新向EventExecutor提交任务,再次开始执行select方法


这里的设计,还得琢磨一下,为什么不直接在一个线程中发送执行。
先重点研究一下代码@3处select(wakeup)方法
5.2.1 select(wakeup)方法详解
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);   // @1
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

int selectedKeys = selector.select(timeoutMillis);   // @2
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {   // @3
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {   // @4
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {      //@5
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);

rebuildSelector();                                                                     //@6
selector = this.selector;

// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}

代码@1,计算select方法应该传入的超时时间,方法主要是从优先级队列(调度队列)中,取第一个节点,计算该任务在多久后应该被调度。

protected long delayNanos(long currentTimeNanos) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            return SCHEDULE_PURGE_INTERVAL;
        }

        return scheduledTask.delayNanos(currentTimeNanos);
    }

代码@2,执行带超时时间的select方法。
代码@3,如果本次有选择出感兴趣的键、或有调度任务处理,则跳出,去执行相关的操作。
代码@4,此处的判断主要是判断是否是空轮询,由于select是带超时时间的,如果没有超过其超时时间,就返回并且没有选择到键,则认为发生了空轮训,然后执行@5的逻辑,如果连续发生空轮询,超过SELECTOR_AUTO_REBUILD_THRESHOLD的值(默认512)次的话,执行重建Selector操作,也就是@6 rebuildSelector()方法执行逻辑。

5.2.2 rebuildSelector 方法详解
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}

final Selector oldSelector = selector;
final Selector newSelector;

if (oldSelector == null) {
return;
}

try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}

int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}

break;
}

selector = newSelector;

try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}

logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}

该方法的实现思路如下:
首先,调用者必须是EventExecutor的当前线程,然后就是新建一个Selector,然后将原来注册在Selector的通道,事件重新注册到新的Selector( selector.keys()),并取消在原Selector上的事件(取消操作非常重要,因为如果不取消,Selector关闭后,注册在Selector上的通道都将关闭)然后关闭旧的Selector以释放相关资源。上述代码就不一一详细解读了。

5.2.3 关于run方法  @52,@62 processSelectedKeys()方法详解
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) {
return;
}

Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();

if (a instanceof AbstractNioChannel) {       //@1
processSelectedKey(k, (AbstractNioChannel) a);
} else {                //@2
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

if (needsToSelectAgain) {     //@3
selectAgain();
selectedKeys = selector.selectedKeys();

// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
该方法,主要就是遍历选择键,真正对键的处理在代码@1,代码@2中。主要看一下代码@1的处理逻辑吧。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {                                                                             //@1
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();   //@2
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {     //@3
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {   //@4
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {   //@5
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924 int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

代码@1,如果key不可用,直接将通道关闭。
代码@2,获取该key已准备就绪的操作事件。
代码@3,判断是否有读事件就绪,如果就绪,则直接调用该Channel的unsafe对象的read操作。如果通道已经关闭,则返回。
代码@4,如果有写事件就绪,调用通道 ch.unsafe().forceFlush()方法,强制进行写操作。
代码@5,如果是连接操作,则将该键的感兴趣的连接事件取消,然后调用finishConnect()实现。

5.2.4 关于run方法 @53,@63 runAllTask详解
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
*
* @return {@code true} if and only if at least one task was run
*/
protected boolean runAllTasks() {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}

for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
return true;
}
}
}

/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

this.lastExecutionTime = lastExecutionTime;
return true;
}

private void fetchFromScheduledTaskQueue() {
if (hasScheduledTasks()) {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
break;
}
taskQueue.add(scheduledTask);
}
}
}

上述代码就是运行积压在队列中的任务,在获取任务时没有使用队列的阻塞方法,故这些方法最终不会阻塞。
从这里可以更加具体说一下ioRatio这个参数的含义。io执行时间比例,取值为1到100,Netty把处理Selector.selecteKeys的时间,也就是processSelectedKey()方法执行的时间做执行时间的基数,标记为t1时间,从上文的processSelectedKey讲解,其实可以得出t1的时间就绪事件的网络读写时间。另一部分时间是运行任务队列中的时间。如果ioRatio为100的话,表示processSelectedKey,runAllTask时间依次执行,但如果ioRatio设置小于100的话,runAllTask的运行时间将减少(权重),网络事件(select)就绪事件更加容易得到执行,那我们不妨思考一下任务队列中的task是从如何被加入的?一般也是一些读写事件,但不是由IO线程(EventLoop触发的),而是在业务线程中调用网络读写相关API,此时会先加入队列,然后再被调度执行。但这两部分,其实都是IO时间,所以对于ioRatio这个参数,我认为是
运行任务队列的权重更为直观。

总结,本文深入细致的分析了Netty NIO的事件模型,从事件模型的初始化(构造的全过程)、核心属性、构造方法、核心入口方法等方面细致分析了NioEventLoopGroup、SingleThreadEventExecutor的原理。
Netty事件模型:核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop,每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty
ForkJoinPool,并发度为n),在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程,这里的设计,其实我不太明白为什么要这样,一直在一个线程中执行我个人觉得更好。

问题:

不知道netty为什么要这样设计,,在NioEventLoop中,内部会持有一个EventExecutor(ForkjoinPool,并发度为nEventLoops),我开始以为,一个NioEventLoop,只会用到ForkJoinPool中的一个线程,除非那个线程异常退出了后,才会用一个新的线程来提供服务,这样保证健壮性),,但我仔细看源码时,我发现,每一次select后,会重新进行一次scheduleExecution方法调用,这样会使用ForkJoinPool中的另外一个线程,,这样的设计,不利于ThreadLocal的使用,特别是线程本地内存分配,比如同一个通道的读操作,从线程本地变量中分配一个ByteBuf,然后写操作又会用ForkJoinPool的第二个线程,这样的ByteBuf又不会重复使用,为什么会这样设计呢?为什么在EventExecutor的线程池(ForkJoinPool),只使用一个线程,只有当这个线程奔溃后,再切换另一个线程进行处理(线程池自动处理)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: