您的位置:首页 > 运维架构

Netty的EventLoop

2017-12-30 15:00 155 查看
Netty在启动的时候需要配置相应的NioEventLoopGroup,才能保证当channel进行注册的时候能够注册相应的eventloop,并且保证当channel接收到请求的时候有相应的eventloop交给相应的channelPipeline进行处理。

在NioEventLoopGroup的继承链中,NioEventLoopGroup的构造方法实际在其超类的超类MultithreadEventExecutorGroup中进行。

 
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}

children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

在其构造方法中,首先保证传入的nThread参数大于0,而该参数就是该eventLoopGroup中线程的数量。在构造方法的一开始,就会创建一个SingleThreadEventExecutor数组,顾名思义,这个数组就是存放单个线程的容器,而这个数组的大小也恰恰就是传入的nThread的值。

之后便会在这个数组中一个个通过newChild()方法获得新的NioEventLoop,而NioEventLoop恰恰继承自SingleThreadEventExecutor。

newChild()方法在NioEventLoopGroup中被实现。

protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

在这里,会生成新的NioEventLoop,也就是需要和channel绑定的eventLoop,但在这里只是单纯的创建,所需要的SelectorProvider参数,在一开始的NioEventLoopGroup提供。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}

NioEventLoop的构造方法首先是执行父类的构造方法,其次就是打开selector以便接下来的channel的注册的时候和channel绑定。在其继承链上SingleThreadEventExector给出了更详细的构造方法。

protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;

thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}

terminationFuture.setSuccess(null);
}
}
}
}
});

taskQueue = newTaskQueue();
}

这里的重点就是在这里生成了一个线程赋给了thread成员,也就是说每一个eventloop都与一个线程绑定,生命周期同步,由此可见,每一个channel对应的处理的线程,恰恰就是这里的thread成员,在线程中,直接调用了该eventLoop的run()方法,这里的run()方法实现在了NioEventLoop当中。

在完成了线程的创建之后,则生成一个新的阻塞链表队列作为正在排队等待完成task队列。

因此,在这里生成的线程的run()方法真正的实现在NioEventLoop当中。

在channel向eventLoopGroup注册的时候,就会打开这里的线程。可以看到注册的时候的代码。

在当channel的注册走到unsafe的时候的register()方法的时候。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

首先,在这里通过给channel的eventLoop的赋值,完成了channel与一个eventLoop的绑定,但此时,eventLoop中的selector还未礽与channel绑定,需要在register0()继续这个操作,但是,这里有一个inEventLoop()方法的判断,这个方法很简单,只是判断当前线程是不是就是eventLoop在构造方法中创建的时候的那个线程,显然,这里的线程应该仍旧是在netty启动中的主线程,显然不是eventLoop所绑定的线程,那么将会调用eventLoop的execute()方法,显然这里的execute()方法与线程池的方法不一样,实现在了SingleThreadEventExecutor里面。

public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

在刚刚的注册channel场景下,这里传入的是是注册task实现的是register0()方法。这里仍旧会通过inEventLoop()方法去判断,但显然结果与刚才一样。那么将会执行startThread()方法,在startThread()中,在构造方法实现的线程终于被开启,而刚刚作为参数传入的注册task也会在开启线程之后交给阻塞队列完成。

那么,就可以把目光放到NioEventLoop对于线程的run()方法的实现,也就是重点。

protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();

processSelectedKeys();

final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);

// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}

在run()方法中是一个死循环,在循环得到一开始,首先会通过hasTasks()判断这时在阻塞队列中是否还有未完成的任务。如果队列中仍旧存在,则会直接调用selectNow,否则会通过select()方法去取得io信息。selectNow()与select()的区别在于selectNow()没有tiemout,及时channel没有已经就绪的信息也会立即返回,这也符合队列中仍旧还有未完成的task任务的场景。而如果阻塞队列已空,则会直接调用nioEventLoop的select()方法。

private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioE
be45
ventLoop.");
}
selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);

rebuildSelector();
selector = this.selector;

// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}

在这里,将会有时间timeout的尝试去取io信息,在不断尝试取得io信息的过程中,一旦取到或者阻塞队列
中又有新的任务或者有了新的定时任务需要执行都会导致select过程的中断。

在完成select()方法之后,回到NioEventLoop的run()方法。

可以看到一个ioRatio参数,表示了执行io信息与执行队列中的task配置的时间百分比。如果配置了100,那么执行队列中的任务会直到处理完信息之后开始,并直到处理完队列中的task之后才会继续尝试去取得select
key,如果不是100,那么将会给执行队伍中task任务的时间设为执行io数据时间的(100- ioRatio)/ioRatio百分比的timeout。

首先看到处理io数据的processSelectedKeys()方法,在processSelectedKeys()方法中,如果在刚刚select()方法中取得到了select
key,那么将会直接进入processSelectedKeyOptimized()方法处理刚刚取得到的select key。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363 for (;;) {
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
i++;
}

selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}

这里会不断循环全部处理接收到的selectKey。并且 通过取得到selectkey得到相应的channel去继续在processSelectedKey()方法中进行处理。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924 int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

这里将是处理取得到的io数据的重点,通过select key取得他的ops来判断这次io请求的目的。如果ops为read或者accept,那么将会直接进入unsafe的read()方法,开始读取去接收到的byte数据。

其他的write或者connect都与read和accept类似,都是直接通过unsafe开始业务逻辑的操作,并通过pipeline开始自己编写的业务逻辑对上述情况的操作。

以上就是eventloop对于io数据的操作。

而在完成io操作之后,将会通过runAllTasks()方法开始处理阻塞队列中的任务。

protected boolean runAllTasks() {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}

for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
return true;
}
}
}

首先会通过fetchFromDelayedQueue()方法中尝试将延时队列中已经超过deadline时间的定时任务从延迟队列中取出,保证定时任务的开启,放入阻塞队列中准备开始执行。

private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}

if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}

if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}

然后在没有timeout情况下,将不断从阻塞队列中获取任务,直到将队列中的任务全部处理完成。

以上就是eventloop的功能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: