您的位置:首页 > 其它

Netty 源码分析(三):服务器端的初始化和注册过程

2015-10-20 23:39 901 查看

1. 简介

接下来我们会通过使用 Netty 去实现 NIO TCP 服务器的这个场景来解析 Netty 的源代码,深入了解 Netty 的设计。

使用 Netty 来实现一个 TCP 服务器,我们大致要做以下事情:

创建 ServerSocketChannel、Channel、ChannelHandler 等一系列对象。这里的 Channel 将包含 ServerSocketChannel 和一系列 ChannelHandler。

将 ServerSocketChannel 注册到 Selector 多路复用器上

启动 EventLoop,并将 Channel 注册到其中

为 ServerSocketChannel 绑定端口

接受客户端连接,并将 SocketChannel 注册到 Selector 和 EventLoop 中

处理读写事件
...

2. 源码解析

我们先来了解 Server Channel 的初始化和注册的过程。初始化是来构建 Netty 本身的各种组件,应用用户的设置参数。注册的主要工作最终是将
SelectableChannel
注册到多路复用器
Selector
。这一过程在所有基于 Java NIO 的项目里都是类似的。

服务端的整个构建过程是从
ServerBootstrap
开始的:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(channelInitializer)
.bind(portNum).sync().channel().closeFuture().sync();

接下来我们从
AbstractBootstrap
bind(int port)
方法开始了解 Netty 的源码。

2.1 类:
AbstractBootstrap
,方法:
ChannelFuture doBind(final SocketAddress localAddress)

bind(int port)
的实际工作绝大部分是在
AbstractBootstrap
ChannelFuture doBind(final SocketAddress localAddress)
实现的,我们来看其源码:

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}

doBind(SocketAddress localAddress)
方法主要会做三件事:

ServerChannel 初始化

注册 ServerChannel 到
Selector


将 ServerChannel bind 到本地端口上

最后一步 bind 需要在 register 完成之后再执行,但是因为这些动作都可能发生在不同的线程上,所以 bind 的动作是通过回调的方式实现的,具体细节后面再介绍。本篇将先介绍前两个操作。

因为
AbstractBootstrap
类的
initAndRegister()
方法是接下来第一个被调用的方法,所以我们接下来看它的源码。

2.2 类:
AbstractBootstrap
,方法:
ChannelFuture initAndRegister()

final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = group().register(channel); // 见 2.5
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
//    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
//    added to the event loop's task queue for later execution.
//    i.e. It's safe to attempt bind() or connect() now:
//         because bind() or connect() will be executed *after* the scheduled registration task is executed
//         because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

这个方法主要做了下面几件事:

Channel channel = channelFactory().newChannel();

此处通过
channelFactory()
方法得到的
ChannelFactory
实现类是
ReflectiveChannelFactory
。(这个类是默认实现,
AbstractBootstrap
还提供了方法用来设置其它
ChannelFactory
的实现)这个类将通过
channel(NioServerSocketChannel.class)
提供的类反射创建出
NioServerSocketChannel
。而在
NioServerSocketChannel
的构造函数里,一个
java.nio.channels.ServerSocketChannel
将被构建出来。

接下来,在
NioServerSocketChannel
的父类
AbstractNioChannel
的构造函数中,
ServerSocketChannel
被设置成非阻塞模式
ch.configureBlocking(false);


AbstractNioChannel
父类
AbstractChannel
的构造函数里,有两个重要的对象被构造出来

protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}

在这里
unsafe
pipeline
被构建出来。构造出
unsafe
newUnsafe()
方法是在子类中实现,在本例中
NioMessageUnsafe
(还有一个类似的
NioByteUnsafe
,其为
NioSocketChannel
提供了具体的 IO 实现),它包含了很多具体的方法实现。而
DefaultChannelPipeline
则是另一个重要的类,接下来我们来看看它的构造函数。

2.3 类
DefaultChannelPipeline
的构造函数

public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

DefaultChannelPipeline
构造函数中做了一件很重要的事,就是构造了
ChannelHandler
链。
TailContext
HeadContext
都继承了
AbstractChannelHandlerContext
。另外,
TailContext
实现了
ChannelInboundHandler
HeadContext
实现了
ChannelOutboundHandler
(Netty 5 在此处有变化,其不再区分
ChannelInboundHandler
ChannelOutboundHandler
)。

上面这些就是
final Channel channel = channelFactory().newChannel();
所做的事情。接下来我们来了解
init(Channel channel)
方法所做的事情。在这里,
void init(Channel channel)
方法由
AbstractBootstrap
的子类
ServerBootstrap
实现

2.4 类:
ServerBootstrap
,方法:
void init(Channel channel)

void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}

这个方法主要做了两件事:

NioServerSocketChannel
设置 options 和 attrs

为 pipeline 添加一个 Inbound 处理器
ChannelInitializer
。这个
ChannelInitializer
会在 ChannelRegistered 事件发生时将
ServerBootstrapAcceptor
添加到 pipeline 上。在后面
ServerBootstrapAcceptor
将被用来接收客户端的连接,我们会在后续文章中介绍。
DefaultChannelPipeline
addLast
方法会将新的
ChannelHandler
添加到 tail 之前,其它所有
ChannelHandler
之后。

在做完初始化工作之后,就要开始注册的工作了。接下来来看
group().register(channel)
的实现。其中
group()
方法将会返回我们在
b.group(bossGroup, workerGroup)
中设定的
bossGroup
这里不做过多介绍了。接下来看
register(channel)
方法。

2.5 类:
AbstractChannel.AbstractUnsafe
,方法:
void register(EventLoop eventLoop, ChannelPromise promise)

因为我们实现的是一个 NIO server,所以此处
EventLoop
使用的实现类是
NioEventLoop
NioEventLoop
register(Channel)
方法是继承自
SingleThreadEventLoop
。而
SingleThreadEventLoop
则通过
channel.unsafe().register(this, promise)
方法将注册工作代理给了
Channel.Unsafe
来实现。此处
Unsafe
的实现类是
AbstractChannel.AbstractUnsafe
(多么一致的命名)。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

先解释一下
eventLoop.inEventLoop()
,这个判断在很多地方都能看见。这个方法是用来判断当前的线程是否是
EventLoop
的任务执行线程。如果是,那就不用在添加任务,直接执行就可以了,否则需要将任务添加到
EventLoop
中。在本例中,很明显,执行过程将走到 else 分支中。

注册工作主要是在
doRegister()
方法中实现的,这个方法是定义在
AbstractChannel
中的一个抽象方法。在本例中,这个方法由
AbstractNioChannel
实现的。

2.6 类:
AbstractNioChannel
,方法:
doRegister()

protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

在这个方法中,
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
通过调用 JDK 方法,将
SelectableChannel
注册到
Selector
上。注意一个细节,因为同一个 Channel 和 Selector 可以对应一个 SelectionKey,所以如果另外一个相应的 SelectionKey 的
cancel
方法被执行之后,会导致
SelectableChannel
register
方法抛出
CancelledKeyException
。所以这里通过
selectNow()
方法清除取消状态之后,重新 register。循环的原因就像是注释所描述的一样,Netty 团队也不清楚,这难道是 JDK 的 bug 吗?

上面这几个过程就实现了 Server Channel 的初始化和注册工作。

3. 总结

3.1 设计模式

从这部分代码,我们可以看到很多设计模式的使用

Builder 模式

在使用
ServerBootstrap
构建服务端的时候,Netty 应用了 Builder 模式。虽不是典型应用,但也起到了是代码简洁易懂的目的。

工厂方法模式

Channel channel = channelFactory().newChannel();
使用了工厂方法模式。
ServerBootstrap
提供了方法设置工厂类,同时也提供了默认实现。通过工厂方法模式创建
Channel
,实现了良好的可扩展性。

模板方法模式

AbstractBootstrap
void init(Channel channel)
就是一个模板方法。初始化和注册工作的主要实现方法是
AbstractBootstrap
initAndRegister
,这个方法调用模板方法
init(Channel)
。而模板方法
init(Channel)
则交由子类做具体的实现。

同样,
AbstractChannel
doRegister
方法也是一个模板方法模式的例子。

3.2 设计思想

Netty 用上面复杂的代码实现了并不复杂的功能。其背后反映处的思想就是作为一个通用的高性能网络 IO 框架,Netty 必须设计出一个高性能、高扩展性的基础架构,然后再在这个架构之上实现各种功能。Netty 的执行核心是
EventLoopGroup
及其实现类。绝大部分 IO 操作和非 IO 操作都是交由
EventLoopGroup
来执行。这是 Netty 能被用来实现高并发服务的原因之一。所以本文所涉及的操作,所以并不复杂,但是其中的一些操作,例如注册工作,也是需要交由
EventLoopGroup
异步执行。这虽然因为异步的方式,提高了系统的执行效率,但事也未任务直接的协调制造了困难,这一点在后续的介绍中会看的很清楚。

如果说
EventLoopGroup
是执行调度的核心,那
Channel
就是实现具体操作的实现核心。因为网络编程十分复杂,有各种复杂的协议,也有复杂的底层操作系统实现,所以
Channel
相应的实现类也是种类繁多。这其实并没有增加复杂度,而是将各种复杂功能各归其主,将实现细节梳理清晰。

4. 接下来

接下来的文章将会解析服务端端口绑定的实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty