您的位置:首页 > 运维架构

Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册

2015-04-01 14:34 1246 查看
摘要: 介绍了netty4.0中服务端启动过程中的Channel与EventLoopGroup的注册及服务端线程的启动

启动服务端的代码如下:

public static void main(String[] args) throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SecureChatServerInitializer(sslCtx));

b.bind(PORT).sync().channel().closeFuture().sync();//调用bind方法时启动线程
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

其中ServerBootstrap的bind方法通过调用父类的initAndRegister构造一个Channel,并把Channel注册到EventLoopGroup

final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = group().register(channel);//把channel注册到EventLoopGroup(NioEventLoopGroup)
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

注册代码如下

public ChannelFuture register(Channel channel) {
return next().register(channel);//从EventLoopGroup里获取一个NioEventLoop,并注册channel
}
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}

channel.unsafe().register(this, promise);//注册过程其实是把NioEventLoop(中的selector)注册到channel
return promise;
}

可以看到Channel注册到EventLoopGroup的过程被转换成NioEventLoop(中的selector)注册到channel。

转换后的注册过程如下:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {//启动的时候执行else中的代码
try {
eventLoop.execute(new OneTimeTask() {//这个方法调用回启动
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

启动的时候执行else里的代码,调用eventLoop的execute方法,该方法启动eventLoop中的线程,并把register0作为一个任务,提交给eventLoop中的任务队列。

register0的代码如下:

private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);//SelectableChannel
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

可以看到最终的注册过程是调用jdk的SelectableChannel的register方法,把NioEventLoop的selector注册到了channel上。

下面再看看eventLoop中的线程执行循环,代码如下:

@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks(); //执行任务队列中的任务
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//执行任务队列中的任务
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}

总结一下:

调用ServerBootstrap的bind方法启动服务端的时候,先创建一个Channel对象,然后从NioEventLoopGroup里获取一个NioEventLoop,然后启动NioEventLoop中的线程,同时把注册任务提交到NioEventLoop中的任务队列。NioEventLoop中的线程循环执行,在一定条件下会执行任务队列中的注册任务,从而把NioEventLoop的selector注册到Channel对象。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息