netty源码分析之-服务端启动核心源码分析(5)
2017-11-19 18:19
633 查看
前面分析过引导的作用,这里将分析启动引导也就是对应的客户端或者服务端启动的核心流程
bind方法具体实现如下:
对于①处initAndRegister()方法实现代码:
initAndRegister方法中channelFactory是AbstractBootstrap中持有的成员变量,是我们之前执行:
channel(NioServerSocketChannel.class)进行赋值的,也就是调用来之前分析的关于反射的部分,调用默认的无参构造方法生成对象:
通过SelectorProvider打开一个SocketChannel通道,openServerSocketChannel是对SocketChannel.open()方法阻塞上的一个优化,也就是会创建于一个ServerSocketChannel返回
initAndRegister方法中init(channel)的实现代码:
init方法主要对传入的Channel进行一些通用属性的设置,最后会添加一个特殊的ChannelInitializer来向pipeline中添加一些列用来进行初始化的ChannelHandler,其中ChannelHandler handler = config.handler();可以发现是之前引导初始化传入的handler:
如果有通过handler方法设置一个channelHandler,则在init初始化ServerSocketChannel的时候会添加到对应的pipeline中的最后,对于最后向pipeline中添加的ServerBootstrapAcceptor具体实现如下:
由此可见ServerBootstrapAcceptor是属于InboundHandler的一种,ServerBootstrapAcceptor在channelRead事件触发的时候(也就有客户端连接的时候),把childHandler加到childChannel Pipeline的末尾,设置childHandler的options和attrs,最后把childHandler注册进childGroup
回到doBind方法中doBind0(regFuture, channel, localAddress, promise)实现:
channel.bind的调用绑定对应的SocketAddress到指定的Channel同时一旦完成并通知添加的监听
回到最初的initAndRegister方法中的ChannelFuture regFuture = config().group().register(channel);
可以看到底层的实现也就是java nio中的将Channel注册到Selector上
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
bind方法具体实现如下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); ① final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); ② return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
对于①处initAndRegister()方法实现代码:
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
initAndRegister方法中channelFactory是AbstractBootstrap中持有的成员变量,是我们之前执行:
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
channel(NioServerSocketChannel.class)进行赋值的,也就是调用来之前分析的关于反射的部分,调用默认的无参构造方法生成对象:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); ... public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } ... private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
通过SelectorProvider打开一个SocketChannel通道,openServerSocketChannel是对SocketChannel.open()方法阻塞上的一个优化,也就是会创建于一个ServerSocketChannel返回
initAndRegister方法中init(channel)的实现代码:
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
init方法主要对传入的Channel进行一些通用属性的设置,最后会添加一个特殊的ChannelInitializer来向pipeline中添加一些列用来进行初始化的ChannelHandler,其中ChannelHandler handler = config.handler();可以发现是之前引导初始化传入的handler:
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class). handler(new LoggingHandler(LogLevel.WARN)). childHandler(new MyServerInitializer());
如果有通过handler方法设置一个channelHandler,则在init初始化ServerSocketChannel的时候会添加到对应的pipeline中的最后,对于最后向pipeline中添加的ServerBootstrapAcceptor具体实现如下:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { ... public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ... }
由此可见ServerBootstrapAcceptor是属于InboundHandler的一种,ServerBootstrapAcceptor在channelRead事件触发的时候(也就有客户端连接的时候),把childHandler加到childChannel Pipeline的末尾,设置childHandler的options和attrs,最后把childHandler注册进childGroup
回到doBind方法中doBind0(regFuture, channel, localAddress, promise)实现:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
channel.bind的调用绑定对应的SocketAddress到指定的Channel同时一旦完成并通知添加的监听
回到最初的initAndRegister方法中的ChannelFuture regFuture = config().group().register(channel);
//在AbstractNioChannel中的: protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
private final SelectableChannel ch; ... protected SelectableChannel javaChannel() { return ch; } ... private Selector unwrappedSelector;
可以看到底层的实现也就是java nio中的将Channel注册到Selector上
相关文章推荐
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- netty源码分析之服务端启动
- [netty源码分析]--服务端启动的工作流程分析
- netty核心源码 二 服务端和客户端启动流程
- MySQL源码分析及核心内幕之4 -- 源码服务端main函数开始及启动流程
- netty源码分析之服务端启动
- Netty4 服务端启动源码分析-线程的创建
- Netty源码分析:服务端启动全过程(篇幅很长)
- netty源码分析之服务端启动全解析
- Netty源码分析之服务端启动过程
- netty源码分析之服务端启动全解析
- MySQL源码分析及核心内幕之4 -- 源码服务端main函数开始及启动流程
- 原理剖析(第 010 篇)Netty之服务端启动工作原理分析(上)
- 【死磕Netty】-----服务端启动过程分析
- zookeeper源码分析之一服务端启动过程
- 【Netty源码分析】Netty服务端bind端口过程
- 【Netty源码分析】客户端connect服务端过程
- Netty 4源码解析:服务端启动
- netty源码分析(十三)Netty核心四大组件关系与构建方式深度解读