您的位置:首页 > Web前端 > BootStrap

Netty源码系列(2)——ServerBootstrap

2017-12-24 19:02 866 查看
先上一段经典使用姿势的样例代码。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


在ServerBootstrap中配置了两组NioEventLoopGroup,如上,bossGroup、workerGroup。这就是netty中主从线程池模型的实现。

1、当服务端要监听一个端口号,以便处理连接该端口号的accept请求,这是需要有NioServerSocketChannel,而该channel是需要注册到eventloop上去的,这时就会从bossGroup中选择一个eventloop;

2、当一个客户端的连接被accept之后,这时,会从workerGroup中选择一个eventloop用来处理该SocketChannel上的所有操作。

另外在配置中,配置了两个handler,一个是通过handler接口配置,一个是通过childHandler配置,两个handler的用途是不一样的。

1、handler是NioServerSocketChannel中的ChannelPipeline中的一个环节;

2、childHandler则是客户端连接过来之后,新建的NioSocketChannel中的ChannelPipeline中的一个环节。

接下来还是先重点分析下bind过程,看如果建立服务端,主要逻辑在doBind方法中,代码如下

private ChannelFuture doBind(final SocketAddress localAddress) {
// 1
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
}


bind过程主要分为两大步骤,channel的初始化和register过程,channel的bind操作,一步步来看。

先看初始化和register的逻辑,代码如下:

final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}

ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}


这部分逻辑在Bootstrap分析中已经分析过,其中init方法是由子类实现的,所以这里进一步看下ServerBootstrap中init方法的实现过程,如下:

void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 1
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 2
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}


相较于Bootstrap的init方法,ServerBootstrap的init方法中,除了将handler添加到了ChannelPipeline当中,还将childHandler封装在了一个ServerBootstrapAcceptor的实例中,然后添加到了channelPipeline当中。后续会分析ServerBootstrapAcceptor这个handler的作用。

接下来看下register成功只有,doBind0方法中的逻辑,如下:

private static void doBind0(
final Channe
dba7
lFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}


与doConnect0的逻辑类似,也是将bind操作封装成一个task,提交给eventloop执行。

bind也是一个outbound事件,所以最终也是调用到head的bind方法中,进而调用到AbstractChannel的bind方法中,如下:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 1
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 2
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 3
safeSetSuccess(promise);
}


1、进行bind操作,就是调用ServerSocket的bind操作;

protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}


2、在bind操作成功后,提交一个task到eventloop,用来执行channelActive事件;

3、在promise中设置操作成功的标识。

至此服务端的NioServerSocketChannel就完成注册与bind操作,接下来就可以接受客户端的连接请求了。

当在服务端的NioServerSocketChannel所在的eventloop监听到有read事件时,执行如下:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}


主要就是执行unsafe的read方法,该方法是由子类实现,在NioServerSocketChannel中,起实现是在AbstractNioMessageChannel中的AbstractNioUnsafe中,代码如下:

try {
for (;;) {
// 1、读取消息
int localRead = doReadMessages(readBuf);
// 数据为空
if (localRead == 0) {
break;
}
// channel已经被关闭
if (localRead < 0) {
closed = true;
break;
}
// 自动读取被设置为false
if (!config.isAutoRead()) {
break;
}
// 已经读取的数据个数大于设定的最大值
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
// 2、对于读取到的每个消息,触发channelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}

readBuf.clear();
// 3、读完后,触发channelReadComplete操作
pipeline.fireChannelReadComplete();


1、循环从channel中读取消息,在以下几种情况下会退出循环

a、读取的数据为空;

b、channel已经被关闭;

c、自动读取被设置为false;

d、已经读取的数据个数大于设定的最大值

2、在退出循环后,会将读取的每个消息,触发channelRead事件

3、在上述操作之后,会触发channelReadComplete事件

在深入看下第一步中的doReadMessages方法中,做了什么。

protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}


可以看到,就是进行了accept操作,然后将得到的SocketChannel构建了一个新的NioSocketChannel实例,然后返回,也就是说,通过doReadMessage方法,就获取到了客户端请求连接成功后的NioSocketChannel对象。

在第二步中,会触发channelRead事件,还记得前面在NioServerSocketChannel的ChannelPipeline中添加的ServerBootstrapAcceptor这个handler吗,看下其对应的channelRead事件的处理过程,如下:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 1
child.pipeline().addLast(childHandler);
// 2
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
// 3
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
// 4
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}


1、将ServerBootstrap在配置时,设置的childHandler添加到新建的NioSocketChannel的ChannelPipeline的链表中;

2、配置tcp的参数;

3、添加配置的attr属性;

4、从childGroup中选择一个eventloop,将child这个NioSocketChannel注册上去,注册过程最终会走到AbstractChannel的register0方法中,这个方法在Bootstrap中分析过,区别就是在与这时候判断isActive时,会返回true,所以注册完后,会触发channelActive事件。

if (isActive()) {
/**
* 如果channel还处于活跃状态,则调用ChannlePipeline的fireChannelActive方法
* 会在head的channelActive中,逐步调用,将感兴趣的事件,添加到selectionKey上去
*/
pipeline.fireChannelActive();
}


至此,netty服务端bind一个端口的过程,以及接受客户端的连接请求的过程就讲完了。

总结如下:

1、netty服务端bind过程分为register和bind两个过程,也是通过异步的方式实现;

2、接受客户端的连接请求,将childHandler添加到与客户端连接的NioSocketChannel的ChannelPipeline中,是在ServerBootstrapAcceptor这个handler中完成的,同时这个对客户端的NioSocketChannel,也是在这里,通过提交一个异步任务,将其注册到相应的eventloop上去的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源码 netty 服务端