Netty 源码分析(三):服务器端的初始化和注册过程
2015-10-20 23:39
901 查看
1. 简介
接下来我们会通过使用 Netty 去实现 NIO TCP 服务器的这个场景来解析 Netty 的源代码,深入了解 Netty 的设计。使用 Netty 来实现一个 TCP 服务器,我们大致要做以下事情:
创建 ServerSocketChannel、Channel、ChannelHandler 等一系列对象。这里的 Channel 将包含 ServerSocketChannel 和一系列 ChannelHandler。
将 ServerSocketChannel 注册到 Selector 多路复用器上
启动 EventLoop,并将 Channel 注册到其中
为 ServerSocketChannel 绑定端口
接受客户端连接,并将 SocketChannel 注册到 Selector 和 EventLoop 中
处理读写事件
...
2. 源码解析
我们先来了解 Server Channel 的初始化和注册的过程。初始化是来构建 Netty 本身的各种组件,应用用户的设置参数。注册的主要工作最终是将SelectableChannel注册到多路复用器
Selector。这一过程在所有基于 Java NIO 的项目里都是类似的。
服务端的整个构建过程是从
ServerBootstrap开始的:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) .bind(portNum).sync().channel().closeFuture().sync();
接下来我们从
AbstractBootstrap的
bind(int port)方法开始了解 Netty 的源码。
2.1 类:AbstractBootstrap
,方法:ChannelFuture doBind(final SocketAddress localAddress)
bind(int port)的实际工作绝大部分是在
AbstractBootstrap的
ChannelFuture doBind(final SocketAddress localAddress)实现的,我们来看其源码:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
doBind(SocketAddress localAddress)方法主要会做三件事:
ServerChannel 初始化
注册 ServerChannel 到
Selector
将 ServerChannel bind 到本地端口上
最后一步 bind 需要在 register 完成之后再执行,但是因为这些动作都可能发生在不同的线程上,所以 bind 的动作是通过回调的方式实现的,具体细节后面再介绍。本篇将先介绍前两个操作。
因为
AbstractBootstrap类的
initAndRegister()方法是接下来第一个被调用的方法,所以我们接下来看它的源码。
2.2 类:AbstractBootstrap
,方法:ChannelFuture initAndRegister()
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); // 见 2.5 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
这个方法主要做了下面几件事:
Channel channel = channelFactory().newChannel();
此处通过
channelFactory()方法得到的
ChannelFactory实现类是
ReflectiveChannelFactory。(这个类是默认实现,
AbstractBootstrap还提供了方法用来设置其它
ChannelFactory的实现)这个类将通过
channel(NioServerSocketChannel.class)提供的类反射创建出
NioServerSocketChannel。而在
NioServerSocketChannel的构造函数里,一个
java.nio.channels.ServerSocketChannel将被构建出来。
接下来,在
NioServerSocketChannel的父类
AbstractNioChannel的构造函数中,
ServerSocketChannel被设置成非阻塞模式
ch.configureBlocking(false);
在
AbstractNioChannel父类
AbstractChannel的构造函数里,有两个重要的对象被构造出来
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
在这里
unsafe和
pipeline被构建出来。构造出
unsafe的
newUnsafe()方法是在子类中实现,在本例中
NioMessageUnsafe(还有一个类似的
NioByteUnsafe,其为
NioSocketChannel提供了具体的 IO 实现),它包含了很多具体的方法实现。而
DefaultChannelPipeline则是另一个重要的类,接下来我们来看看它的构造函数。
2.3 类 DefaultChannelPipeline
的构造函数
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
DefaultChannelPipeline构造函数中做了一件很重要的事,就是构造了
ChannelHandler链。
TailContext和
HeadContext都继承了
AbstractChannelHandlerContext。另外,
TailContext实现了
ChannelInboundHandler,
HeadContext实现了
ChannelOutboundHandler(Netty 5 在此处有变化,其不再区分
ChannelInboundHandler和
ChannelOutboundHandler)。
上面这些就是
final Channel channel = channelFactory().newChannel();所做的事情。接下来我们来了解
init(Channel channel)方法所做的事情。在这里,
void init(Channel channel)方法由
AbstractBootstrap的子类
ServerBootstrap实现
2.4 类:ServerBootstrap
,方法:void init(Channel channel)
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
这个方法主要做了两件事:
为
NioServerSocketChannel设置 options 和 attrs
为 pipeline 添加一个 Inbound 处理器
ChannelInitializer。这个
ChannelInitializer会在 ChannelRegistered 事件发生时将
ServerBootstrapAcceptor添加到 pipeline 上。在后面
ServerBootstrapAcceptor将被用来接收客户端的连接,我们会在后续文章中介绍。
DefaultChannelPipeline的
addLast方法会将新的
ChannelHandler添加到 tail 之前,其它所有
ChannelHandler之后。
在做完初始化工作之后,就要开始注册的工作了。接下来来看
group().register(channel)的实现。其中
group()方法将会返回我们在
b.group(bossGroup, workerGroup)中设定的
bossGroup这里不做过多介绍了。接下来看
register(channel)方法。
2.5 类:AbstractChannel.AbstractUnsafe
,方法:void register(EventLoop eventLoop, ChannelPromise promise)
因为我们实现的是一个 NIO server,所以此处 EventLoop使用的实现类是
NioEventLoop。
NioEventLoop的
register(Channel)方法是继承自
SingleThreadEventLoop。而
SingleThreadEventLoop则通过
channel.unsafe().register(this, promise)方法将注册工作代理给了
Channel.Unsafe来实现。此处
Unsafe的实现类是
AbstractChannel.AbstractUnsafe(多么一致的命名)。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
先解释一下
eventLoop.inEventLoop(),这个判断在很多地方都能看见。这个方法是用来判断当前的线程是否是
EventLoop的任务执行线程。如果是,那就不用在添加任务,直接执行就可以了,否则需要将任务添加到
EventLoop中。在本例中,很明显,执行过程将走到 else 分支中。
注册工作主要是在
doRegister()方法中实现的,这个方法是定义在
AbstractChannel中的一个抽象方法。在本例中,这个方法由
AbstractNioChannel实现的。
2.6 类:AbstractNioChannel
,方法:doRegister()
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
在这个方法中,
selectionKey = javaChannel().register(eventLoop().selector, 0, this);通过调用 JDK 方法,将
SelectableChannel注册到
Selector上。注意一个细节,因为同一个 Channel 和 Selector 可以对应一个 SelectionKey,所以如果另外一个相应的 SelectionKey 的
cancel方法被执行之后,会导致
SelectableChannel的
register方法抛出
CancelledKeyException。所以这里通过
selectNow()方法清除取消状态之后,重新 register。循环的原因就像是注释所描述的一样,Netty 团队也不清楚,这难道是 JDK 的 bug 吗?
上面这几个过程就实现了 Server Channel 的初始化和注册工作。
3. 总结
3.1 设计模式
从这部分代码,我们可以看到很多设计模式的使用Builder 模式
在使用ServerBootstrap构建服务端的时候,Netty 应用了 Builder 模式。虽不是典型应用,但也起到了是代码简洁易懂的目的。
工厂方法模式
Channel channel = channelFactory().newChannel();使用了工厂方法模式。
ServerBootstrap提供了方法设置工厂类,同时也提供了默认实现。通过工厂方法模式创建
Channel,实现了良好的可扩展性。
模板方法模式
AbstractBootstrap的
void init(Channel channel)就是一个模板方法。初始化和注册工作的主要实现方法是
AbstractBootstrap的
initAndRegister,这个方法调用模板方法
init(Channel)。而模板方法
init(Channel)则交由子类做具体的实现。
同样,
AbstractChannel的
doRegister方法也是一个模板方法模式的例子。
3.2 设计思想
Netty 用上面复杂的代码实现了并不复杂的功能。其背后反映处的思想就是作为一个通用的高性能网络 IO 框架,Netty 必须设计出一个高性能、高扩展性的基础架构,然后再在这个架构之上实现各种功能。Netty 的执行核心是EventLoopGroup及其实现类。绝大部分 IO 操作和非 IO 操作都是交由
EventLoopGroup来执行。这是 Netty 能被用来实现高并发服务的原因之一。所以本文所涉及的操作,所以并不复杂,但是其中的一些操作,例如注册工作,也是需要交由
EventLoopGroup异步执行。这虽然因为异步的方式,提高了系统的执行效率,但事也未任务直接的协调制造了困难,这一点在后续的介绍中会看的很清楚。
如果说
EventLoopGroup是执行调度的核心,那
Channel就是实现具体操作的实现核心。因为网络编程十分复杂,有各种复杂的协议,也有复杂的底层操作系统实现,所以
Channel相应的实现类也是种类繁多。这其实并没有增加复杂度,而是将各种复杂功能各归其主,将实现细节梳理清晰。
4. 接下来
接下来的文章将会解析服务端端口绑定的实现。相关文章推荐
- Netty使用Http上传文件
- tomcat、netty以及nodejs的helloworld性能对比 3ff8
- flatbuffers 和netty的结合使用
- netty 处理远程主机强制关闭一个连接
- 轻量级分布式 RPC 框架
- spark总体概况
- Netty系列之Netty百万级推送服务设计要点
- Netty初步
- Netty ChannelBuffer 简介
- netty4研究系列-序
- netty io.netty.channel 简介1
- spark overview
- Netty4和Netty5内存池的使用心得
- Netty与Reactor模式
- Netty源码分析之DelimiterBasedFrameDecoder
- Netty4学习笔记-001
- Netty4学习笔记-002
- Netty -- 内存管理
- Netty -- Background
- NIO简介