netty之AbstractBootstrap及其子类源码分析
2016-11-08 22:44
786 查看
netty之AbstractBootstrap及其子类源码分析
参考书籍:netty权威指南第二版netty版本:5.0.0.Alpha2 https://github.com/netty/netty/tree/netty-5.0.0.Alpha2
ServerBootsrap启动类分析
ServerChannel的启动辅助类,可以方便的创建一个NioServerSocketChannel。下面我们先来看一个简单的示例代码:
public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //设置两个线程组 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
现在来逐个分析函数内部实现:
group函数:
ServerBootstrap中group其实是先调用了父类的group函数,父类的group函数被ServerBootstrap和Bootstrap类共同使用,ServerBootstrap进行了重写,其中parentGroup为io线程组,childGroup为工作线程组。这两个函数都比较简单都只是进行的简单的赋值。这个函数其实有连个版本一个是一个参数的另一个是两个参数的,一般服务器推荐下面两个参数的版本
子类的实现:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
父类的实现
public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
channel函数
这个函数是继承自父类AbstractBootstrap,功能是通过指定channel的类型实例化内部的channel工厂类。该工厂类在bind函数中创建一个channel时会用到。netty的channel很多都是由工厂类产生,早期的接口是io.netty.bootstrapChannelFactory,现在的有io.channel.ChannelFactory接口。在新的5.0.0.Alpha2中最终的实现类是io.netty.channel.ReflectiveChannelFactory。channel函数
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
函数调用了channelFactory函数,传入的参数是个实现了ChannelFactory的具体类。
channelFactory函数
public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return (B) this; }
对内部的channel工厂类进行赋值。
ChannelFactory接口
public interface ChannelFactory<T extends Channel> { /** * Creates a new channel. */ T newChannel(); }
类中仅有一个newChannel方法,改方法用来产生所需的channel。
ReflectiveChannelFactory类
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }
option函数
这个函数是用来设置与tcp相关的一些参数,如果想要取消设置则可以对相应的参数设置值为null即取消设置。option函数
public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { //如果值为空则取消设置 synchronized (options) { options.remove(option); } } else { //设置相应的值 synchronized (options) { options.put(option, value); } } return (B) this; }
具体的设置选项参考ChannelOption类,常用的设置有
ChannelOption.SO_BACKLOG 用于临时存放已完成三次握手的请求的队列的最大长度
ChannelOption.SO_KEEPALIVE 是否保持心跳机制
SO_TIMEOUT 等待客户连接的超时时间
SO_RCVBUF 接收缓冲区的大小
SO_SNDBUF 发送缓冲区的大小
childHandler和handler函数
这两个函数是用于设置channel的。childHandler,它的目的是添加handler,用来监听已经连接的客户端的Channel的动作和状态,一般服务器端使用,为连接上来的channel设置handle。handler方法,目的是添加一个handler,监听Bootstrap的动作,客户端的Bootstrap中,继承了这一点,一般是客户端的使用,为当前套接字设置handle。childHandler函数
public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }
handler函数
public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return (B) this; }
这两个函数都比较简单,只是对ServerBootstrap的ChildHandler和Handler类进行了赋值。这在后面的bind函数中调用的初始化程序会用到。下面我们重点分析下bind函数。
bind函数(重点)
这个函数本身代码并不难,只是内部的调用很多,很容易引起混乱所以需要好好整理思路一步步分析。bind函数
public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
bind函数内部实际调用了上面的bind函数对将对应端口实例化成InetSocketAddress
doBind函数
private ChannelFuture doBind(final SocketAddress localAddress) { //此处调用了initAndRegister函数进行初始化和注册 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // 进人这里就是已经初始化和注册执行成功 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //添加监听器,一旦注册完成会自动调用内部的 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { //初始化和注册失败,设置future的结果 promise.setFailure(cause); } else { //初始化和注册成功,设置执行的线程 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
initAndRegister函数
这个函数初始化channel并将其注册到eventloop上去
final ChannelFuture initAndRegister() { //此处用到channel的工厂类生成一个channel final Channel channel = channelFactory().newChannel(); try { //调用初始化函数 init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //将channel注册到eventloop上去 ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } //如果程序走到这一步,那么应该是以下几个情况之一(这两种都是安全的): //1. 如果我们尝试在eventloop线程中注册,注册已经完成,那么就可以直接执行bind和connect函数,因为注册已经完成 //2. 如果我们尝试在其他线程中注册,注册已经完成,那么执行bind和connnect等函数就会封装成task放到eventloop的任务队列中等待执行 return regFuture; }
init函数
这个函数是在AbstractBootStrap中定义的抽象函数,由子类实现具体细节,下面是ServerBootstrap中的实现函数。
void init(Channel channel) throws Exception { //这个map保存option函数的设置的值 final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); //此处是针对NioServerSocketChannel,如过你有调用过handler函数这里的hendler返回值不为零 if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
ChannelInitializer类
这个抽象类也是一个handler,用于初始化handler,当channel被注册到eventloop时会被激活
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class); //用户自定义的函数用户初始化 protected abstract void initChannel(C ch) throws Exception; @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { //调用用户自定的函数 initChannel((C) ctx.channel()); //初始化完成,把这个handler从当前pipeline中移出 pipeline.remove(this); ctx.fireChannelRegistered(); success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } } }
ServerBootstrapAcceptor类定义
这个类是ServerBootstrap的一个内部类,用于处理接收到新的channel的handler,代码比较简单。
private static class ServerBootstrapAcceptor extends ChannelHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; //构造函数 ServerBootstrapAcceptor( EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; } @Override @SuppressWarnings("unchecked") //为新接入的SocketChannel初始化 public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } private static void forceClose(Channel child, Throwable t) { child.unsafe().closeForcibly(); logger.warn("Failed to register an accepted channel: " + child, t); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final ChannelConfig config = ctx.channel().config(); if (config.isAutoRead()) { // stop accept new connections for 1 second to allow the channel to recover // See https://github.com/netty/netty/issues/1328 config.setAutoRead(false); ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { config.setAutoRead(true); } }, 1, TimeUnit.SECONDS); } // still let the exceptionCaught event flow through the pipeline to give the user // a chance to do something with it ctx.fireExceptionCaught(cause); } }
doBind0函数
最后再来看一下这个函数。这个函数调用了channel的bind方法,实际最后调用的是pipeline的bind事件。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
此处的 【This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up the pipeline in its channelRegistered() implementation.】官方注释不是很理解,从中文的意思就是说这个方法会在channelRegistered事件发生之前触发给用户一个时间去调用注册完成时的处理方法。等以后来解决了 待更!
相关文章推荐
- Map及其子类源码简单分析以及性能比较
- 自顶向下深入分析Netty(三)--Bootstrap源码分析
- netty4.0.x源码分析—bootstrap
- Netty源码分析之Bootstrap启动过程分析
- Netty 源码分析之 一 客户端创建(Bootstrap )
- Netty源码分析:AbstractByteBuf
- netty源码分析(三)Netty服务端ServerBootstrap的初始化与反射在其中的应用分析
- Netty3 源码分析 - ClientBootstrap
- Netty 源码分析之 一 服务端创建(ServerBootstrap )
- Netty源码分析:ServerBootstrap
- java I/O FilterInputStream及其子类源码分析
- netty源码分析(六)Reactor模式透彻理解及其在Netty中的应用
- Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
- netty源码分析系列——Bootstrap
- netty4.0.x源码分析—bootstrap
- netty4.0.x源码分析—bootstrap
- Netty源码分析之一【揭开Bootstrap神秘的红盖头】
- [zz]DEFAULT_KEYS_SHORTCUT 功能的验证 及其 源码实现分析
- nginx源码分析—模块及其初始化
- ARM汇编中的ldr和adr的区别及其在uboot中相关源码的分析