Netty源码系列(2)——ServerBootstrap
2017-12-24 19:02
866 查看
先上一段经典使用姿势的样例代码。
在ServerBootstrap中配置了两组NioEventLoopGroup,如上,bossGroup、workerGroup。这就是netty中主从线程池模型的实现。
1、当服务端要监听一个端口号,以便处理连接该端口号的accept请求,这是需要有NioServerSocketChannel,而该channel是需要注册到eventloop上去的,这时就会从bossGroup中选择一个eventloop;
2、当一个客户端的连接被accept之后,这时,会从workerGroup中选择一个eventloop用来处理该SocketChannel上的所有操作。
另外在配置中,配置了两个handler,一个是通过handler接口配置,一个是通过childHandler配置,两个handler的用途是不一样的。
1、handler是NioServerSocketChannel中的ChannelPipeline中的一个环节;
2、childHandler则是客户端连接过来之后,新建的NioSocketChannel中的ChannelPipeline中的一个环节。
接下来还是先重点分析下bind过程,看如果建立服务端,主要逻辑在doBind方法中,代码如下
bind过程主要分为两大步骤,channel的初始化和register过程,channel的bind操作,一步步来看。
先看初始化和register的逻辑,代码如下:
这部分逻辑在Bootstrap分析中已经分析过,其中init方法是由子类实现的,所以这里进一步看下ServerBootstrap中init方法的实现过程,如下:
相较于Bootstrap的init方法,ServerBootstrap的init方法中,除了将handler添加到了ChannelPipeline当中,还将childHandler封装在了一个ServerBootstrapAcceptor的实例中,然后添加到了channelPipeline当中。后续会分析ServerBootstrapAcceptor这个handler的作用。
接下来看下register成功只有,doBind0方法中的逻辑,如下:
与doConnect0的逻辑类似,也是将bind操作封装成一个task,提交给eventloop执行。
bind也是一个outbound事件,所以最终也是调用到head的bind方法中,进而调用到AbstractChannel的bind方法中,如下:
1、进行bind操作,就是调用ServerSocket的bind操作;
2、在bind操作成功后,提交一个task到eventloop,用来执行channelActive事件;
3、在promise中设置操作成功的标识。
至此服务端的NioServerSocketChannel就完成注册与bind操作,接下来就可以接受客户端的连接请求了。
当在服务端的NioServerSocketChannel所在的eventloop监听到有read事件时,执行如下:
主要就是执行unsafe的read方法,该方法是由子类实现,在NioServerSocketChannel中,起实现是在AbstractNioMessageChannel中的AbstractNioUnsafe中,代码如下:
1、循环从channel中读取消息,在以下几种情况下会退出循环
a、读取的数据为空;
b、channel已经被关闭;
c、自动读取被设置为false;
d、已经读取的数据个数大于设定的最大值
2、在退出循环后,会将读取的每个消息,触发channelRead事件
3、在上述操作之后,会触发channelReadComplete事件
在深入看下第一步中的doReadMessages方法中,做了什么。
可以看到,就是进行了accept操作,然后将得到的SocketChannel构建了一个新的NioSocketChannel实例,然后返回,也就是说,通过doReadMessage方法,就获取到了客户端请求连接成功后的NioSocketChannel对象。
在第二步中,会触发channelRead事件,还记得前面在NioServerSocketChannel的ChannelPipeline中添加的ServerBootstrapAcceptor这个handler吗,看下其对应的channelRead事件的处理过程,如下:
1、将ServerBootstrap在配置时,设置的childHandler添加到新建的NioSocketChannel的ChannelPipeline的链表中;
2、配置tcp的参数;
3、添加配置的attr属性;
4、从childGroup中选择一个eventloop,将child这个NioSocketChannel注册上去,注册过程最终会走到AbstractChannel的register0方法中,这个方法在Bootstrap中分析过,区别就是在与这时候判断isActive时,会返回true,所以注册完后,会触发channelActive事件。
至此,netty服务端bind一个端口的过程,以及接受客户端的连接请求的过程就讲完了。
总结如下:
1、netty服务端bind过程分为register和bind两个过程,也是通过异步的方式实现;
2、接受客户端的连接请求,将childHandler添加到与客户端连接的NioSocketChannel的ChannelPipeline中,是在ServerBootstrapAcceptor这个handler中完成的,同时这个对客户端的NioSocketChannel,也是在这里,通过提交一个异步任务,将其注册到相应的eventloop上去的。
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
在ServerBootstrap中配置了两组NioEventLoopGroup,如上,bossGroup、workerGroup。这就是netty中主从线程池模型的实现。
1、当服务端要监听一个端口号,以便处理连接该端口号的accept请求,这是需要有NioServerSocketChannel,而该channel是需要注册到eventloop上去的,这时就会从bossGroup中选择一个eventloop;
2、当一个客户端的连接被accept之后,这时,会从workerGroup中选择一个eventloop用来处理该SocketChannel上的所有操作。
另外在配置中,配置了两个handler,一个是通过handler接口配置,一个是通过childHandler配置,两个handler的用途是不一样的。
1、handler是NioServerSocketChannel中的ChannelPipeline中的一个环节;
2、childHandler则是客户端连接过来之后,新建的NioSocketChannel中的ChannelPipeline中的一个环节。
接下来还是先重点分析下bind过程,看如果建立服务端,主要逻辑在doBind方法中,代码如下
private ChannelFuture doBind(final SocketAddress localAddress) { // 1 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 2 final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } }
bind过程主要分为两大步骤,channel的初始化和register过程,channel的bind操作,一步步来看。
先看初始化和register的逻辑,代码如下:
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
这部分逻辑在Bootstrap分析中已经分析过,其中init方法是由子类实现的,所以这里进一步看下ServerBootstrap中init方法的实现过程,如下:
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 1 ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } // 2 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
相较于Bootstrap的init方法,ServerBootstrap的init方法中,除了将handler添加到了ChannelPipeline当中,还将childHandler封装在了一个ServerBootstrapAcceptor的实例中,然后添加到了channelPipeline当中。后续会分析ServerBootstrapAcceptor这个handler的作用。
接下来看下register成功只有,doBind0方法中的逻辑,如下:
private static void doBind0( final Channe dba7 lFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
与doConnect0的逻辑类似,也是将bind操作封装成一个task,提交给eventloop执行。
bind也是一个outbound事件,所以最终也是调用到head的bind方法中,进而调用到AbstractChannel的bind方法中,如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() && Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } // 1 boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 2 if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } // 3 safeSetSuccess(promise); }
1、进行bind操作,就是调用ServerSocket的bind操作;
protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
2、在bind操作成功后,提交一个task到eventloop,用来执行channelActive事件;
3、在promise中设置操作成功的标识。
至此服务端的NioServerSocketChannel就完成注册与bind操作,接下来就可以接受客户端的连接请求了。
当在服务端的NioServerSocketChannel所在的eventloop监听到有read事件时,执行如下:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } }
主要就是执行unsafe的read方法,该方法是由子类实现,在NioServerSocketChannel中,起实现是在AbstractNioMessageChannel中的AbstractNioUnsafe中,代码如下:
try { for (;;) { // 1、读取消息 int localRead = doReadMessages(readBuf); // 数据为空 if (localRead == 0) { break; } // channel已经被关闭 if (localRead < 0) { closed = true; break; } // 自动读取被设置为false if (!config.isAutoRead()) { break; } // 已经读取的数据个数大于设定的最大值 if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { // 2、对于读取到的每个消息,触发channelRead事件 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); // 3、读完后,触发channelReadComplete操作 pipeline.fireChannelReadComplete();
1、循环从channel中读取消息,在以下几种情况下会退出循环
a、读取的数据为空;
b、channel已经被关闭;
c、自动读取被设置为false;
d、已经读取的数据个数大于设定的最大值
2、在退出循环后,会将读取的每个消息,触发channelRead事件
3、在上述操作之后,会触发channelReadComplete事件
在深入看下第一步中的doReadMessages方法中,做了什么。
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
可以看到,就是进行了accept操作,然后将得到的SocketChannel构建了一个新的NioSocketChannel实例,然后返回,也就是说,通过doReadMessage方法,就获取到了客户端请求连接成功后的NioSocketChannel对象。
在第二步中,会触发channelRead事件,还记得前面在NioServerSocketChannel的ChannelPipeline中添加的ServerBootstrapAcceptor这个handler吗,看下其对应的channelRead事件的处理过程,如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // 1 child.pipeline().addLast(childHandler); // 2 for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } // 3 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } // 4 try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
1、将ServerBootstrap在配置时,设置的childHandler添加到新建的NioSocketChannel的ChannelPipeline的链表中;
2、配置tcp的参数;
3、添加配置的attr属性;
4、从childGroup中选择一个eventloop,将child这个NioSocketChannel注册上去,注册过程最终会走到AbstractChannel的register0方法中,这个方法在Bootstrap中分析过,区别就是在与这时候判断isActive时,会返回true,所以注册完后,会触发channelActive事件。
if (isActive()) { /** * 如果channel还处于活跃状态,则调用ChannlePipeline的fireChannelActive方法 * 会在head的channelActive中,逐步调用,将感兴趣的事件,添加到selectionKey上去 */ pipeline.fireChannelActive(); }
至此,netty服务端bind一个端口的过程,以及接受客户端的连接请求的过程就讲完了。
总结如下:
1、netty服务端bind过程分为register和bind两个过程,也是通过异步的方式实现;
2、接受客户端的连接请求,将childHandler添加到与客户端连接的NioSocketChannel的ChannelPipeline中,是在ServerBootstrapAcceptor这个handler中完成的,同时这个对客户端的NioSocketChannel,也是在这里,通过提交一个异步任务,将其注册到相应的eventloop上去的。
相关文章推荐
- Netty源码阅读(一) ServerBootstrap启动
- Netty源码阅读(一) ServerBootstrap启动
- Netty源码学习-ServerBootstrap启动及事件处理过程
- netty源码分析系列——Bootstrap
- 【Netty源码学习】ServerBootStrap
- Netty源码解读------------ServerBootstrap的启动(一)
- 【Netty源码学习】ServerBootStrap
- 【Netty源码学习】ServerBootStrap
- Netty源码分析:ServerBootstrap
- Netty源码-ServerBootStrap服务端启动流程
- Netty4.x 源码实战系列(一): 深入理解ServerBootstrap 与 Bootstrap (1)
- netty源码学习一(Serverbootstrap引导程序)
- UiAutomator系列——Appium Server源码分析之作为Bootstrap客户端(011)
- Netty源码阅读(一) ServerBootstrap启动
- netty源码分析(三)Netty服务端ServerBootstrap的初始化与反射在其中的应用分析
- Netty 源码分析之 一 服务端创建(ServerBootstrap )
- 【Netty源码学习】BootStrap
- Netty系列-服务器端启动源码分析
- Netty 之 netty源码学习之netty server端源码初读(上)
- 一起学Netty(十八)netty源码学习之netty server端源码初读(上)