您的位置:首页 > Web前端 > BootStrap

netty之AbstractBootstrap及其子类源码分析

2016-11-08 22:44 786 查看

netty之AbstractBootstrap及其子类源码分析

参考书籍:netty权威指南第二版

netty版本:5.0.0.Alpha2 https://github.com/netty/netty/tree/netty-5.0.0.Alpha2

ServerBootsrap启动类分析

ServerChannel的启动辅助类,可以方便的创建一个NioServerSocketChannel。

下面我们先来看一个简单的示例代码:

public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
//设置两个线程组
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}


现在来逐个分析函数内部实现:

group函数:

ServerBootstrap中group其实是先调用了父类的group函数,父类的group函数被ServerBootstrap和Bootstrap类共同使用,ServerBootstrap进行了重写,其中parentGroup为io线程组,childGroup为工作线程组。这两个函数都比较简单都只是进行的简单的赋值。

这个函数其实有连个版本一个是一个参数的另一个是两个参数的,一般服务器推荐下面两个参数的版本

子类的实现:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}


父类的实现

public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}


channel函数

这个函数是继承自父类AbstractBootstrap,功能是通过指定channel的类型实例化内部的channel工厂类。该工厂类在bind函数中创建一个channel时会用到。netty的channel很多都是由工厂类产生,早期的接口是io.netty.bootstrapChannelFactory,现在的有io.channel.ChannelFactory接口。在新的5.0.0.Alpha2中最终的实现类是io.netty.channel.ReflectiveChannelFactory。

channel函数

public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}


函数调用了channelFactory函数,传入的参数是个实现了ChannelFactory的具体类。

channelFactory函数

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory;
return (B) this;
}


对内部的channel工厂类进行赋值。

ChannelFactory接口

public interface ChannelFactory<T extends Channel> {
/**
* Creates a new channel.
*/
T newChannel();
}


类中仅有一个newChannel方法,改方法用来产生所需的channel。

ReflectiveChannelFactory类

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Class<? extends T> clazz;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}


option函数

这个函数是用来设置与tcp相关的一些参数,如果想要取消设置则可以对相应的参数设置值为null即取消设置。

option函数

public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
//如果值为空则取消设置
synchronized (options) {
options.remove(option);
}
} else {
//设置相应的值
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}


具体的设置选项参考ChannelOption类,常用的设置有

ChannelOption.SO_BACKLOG 用于临时存放已完成三次握手的请求的队列的最大长度

ChannelOption.SO_KEEPALIVE 是否保持心跳机制

SO_TIMEOUT 等待客户连接的超时时间

SO_RCVBUF 接收缓冲区的大小

SO_SNDBUF 发送缓冲区的大小

childHandler和handler函数

这两个函数是用于设置channel的。childHandler,它的目的是添加handler,用来监听已经连接的客户端的Channel的动作和状态,一般服务器端使用,为连接上来的channel设置handle。handler方法,目的是添加一个handler,监听Bootstrap的动作,客户端的Bootstrap中,继承了这一点,一般是客户端的使用,为当前套接字设置handle。

childHandler函数

public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}


handler函数

public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}


这两个函数都比较简单,只是对ServerBootstrap的ChildHandler和Handler类进行了赋值。这在后面的bind函数中调用的初始化程序会用到。下面我们重点分析下bind函数。

bind函数(重点)

这个函数本身代码并不难,只是内部的调用很多,很容易引起混乱所以需要好好整理思路一步步分析。

bind函数

public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}


bind函数内部实际调用了上面的bind函数对将对应端口实例化成InetSocketAddress

doBind函数

private ChannelFuture doBind(final SocketAddress localAddress) {
//此处调用了initAndRegister函数进行初始化和注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// 进人这里就是已经初始化和注册执行成功
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//添加监听器,一旦注册完成会自动调用内部的
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
//初始化和注册失败,设置future的结果
promise.setFailure(cause);
} else {
//初始化和注册成功,设置执行的线程
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}


initAndRegister函数

这个函数初始化channel并将其注册到eventloop上去

final ChannelFuture initAndRegister() {
//此处用到channel的工厂类生成一个channel
final Channel channel = channelFactory().newChannel();
try {
//调用初始化函数
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
//将channel注册到eventloop上去
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

//如果程序走到这一步,那么应该是以下几个情况之一(这两种都是安全的):
//1. 如果我们尝试在eventloop线程中注册,注册已经完成,那么就可以直接执行bind和connect函数,因为注册已经完成
//2. 如果我们尝试在其他线程中注册,注册已经完成,那么执行bind和connnect等函数就会封装成task放到eventloop的任务队列中等待执行

return regFuture;
}


init函数

这个函数是在AbstractBootStrap中定义的抽象函数,由子类实现具体细节,下面是ServerBootstrap中的实现函数。

void init(Channel channel) throws Exception {
//这个map保存option函数的设置的值
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();
//此处是针对NioServerSocketChannel,如过你有调用过handler函数这里的hendler返回值不为零
if (handler() != null) {
p.addLast(handler());
}

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}


ChannelInitializer类

这个抽象类也是一个handler,用于初始化handler,当channel被注册到eventloop时会被激活

public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
//用户自定义的函数用户初始化
protected abstract void initChannel(C ch) throws Exception;

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
//调用用户自定的函数
initChannel((C) ctx.channel());
//初始化完成,把这个handler从当前pipeline中移出
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
}


ServerBootstrapAcceptor类定义

这个类是ServerBootstrap的一个内部类,用于处理接收到新的channel的handler,代码比较简单。

private static class ServerBootstrapAcceptor extends ChannelHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
//构造函数
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}

@Override
@SuppressWarnings("unchecked")
//为新接入的SocketChannel初始化
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328 config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}


doBind0函数

最后再来看一下这个函数。这个函数调用了channel的bind方法,实际最后调用的是pipeline的bind事件。

private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}


此处的 【This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up the pipeline in its channelRegistered() implementation.】官方注释不是很理解,从中文的意思就是说这个方法会在channelRegistered事件发生之前触发给用户一个时间去调用注册完成时的处理方法。等以后来解决了 待更!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源码