您的位置:首页 > Web前端 > BootStrap

netty4.0.x源码分析—bootstrap

2013-09-24 15:47 435 查看
Bootstrap的意思就是引导,辅助的意思,在编写服务端或客户端程序时,我们都需要先new一个bootstrap,然后基于这个bootstrap调用函数,添加eventloop和handler,可见对bootstrap进行分析还是有必要的。

1、bootstrap结构图

bootstrap的结构比较简单,涉及的类和接口很少,如下图所示,其中Bootstrap则是客户端程序用的引导类,ServerBootstrap是服务端程序用的引导类。



2、serverbootstrap分析

这部分,专门对serverbootstrap进行分析,bootstrap过程大同小异就不作详细的分析了。下面是我们编写服务端代码的一般化过程,整个分析过程将基于下面这段代码中用到的函数进行。

// Configure the bootstrap.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HexDumpProxyInitializer(remoteHost, remotePort))
.childOption(ChannelOption.AUTO_READ, false)
.bind(localPort).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


先看关键代码(注意这里面的部分函数是在AbstractBootstrap中定义的)

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;

/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
属性值ChildGroup,ChildHandler,是用来处理accpt的Channel的。group函数其实就是将parentGroup和ChildGroup进行赋值,其中parentGroup用于处理accept事件,ChildGroup用于处理accpt的Channel的IO事件。

//channel函数的实现定义在抽象父类中,其实就是通过newInstance函数生成一个具体的channel对象。
<pre name="code" class="java">    /**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}

/**
* {@link ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
@SuppressWarnings("unchecked")
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory;
return (B) this;
}<pre name="code" class="java">    private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;

BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return clazz.getSimpleName() + ".class";
}
}



Channel函数比较简单,其实就是通过newInstance函数,生成一个具体的Channel对象,例如服务端的NioServerSocketChannel。

/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
上面的函数即给serverbootstrap的childHandler赋值。

/**
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
* {@link ChannelOption}.
*/
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
上面的函数是指定accpt的channel的属性,channel有很多属性,比如SO_TIMEOUT时间,Buf长度等等。

/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}

/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}

/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(String inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}

<pre name="code" class="java">    /**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
}
});
}

return promise;
}<pre name="code" class="java">    private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}



Bind函数层层调用过来之后,最后就调用Channel的bind函数了,下面再看channel的bind函数是如何处理的。定义在AbstractChannel中:

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
channel的bind函数,最终就是调用pipeline的bind,而pipeline的bind实际上就是调用contexthandler的bind,之个之前分析write和flush的时候说过了。所以这里直接看contexthandler的bind函数。下面是定义:

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(promise, false);

final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
});
}

return promise;
}<pre name="code" class="java">    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}


最终调用Handler的bind函数,还记得之前说的outbound类型的事件吗,这类事件提供了默认的实现方法,HeadHandler的bind函数,下面是它的定义:

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
我们又看到了unsafe这个苦力了,最终的操作还是得由它来完成啊,赶紧去看看这个bind函数吧,

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}

// See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}

boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
closeIfClosed();
promise.setFailure(t);
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
promise.setSuccess();
}
上面的代码最终调用了Channel的doBind函数,这里我们的Channel是NioServerSocketChannel,所以最终就是调用它的bind函数了,代码如下

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
其实它最终也是调用了JDK的Channel的socket bind函数。

看到这里,你是否会觉得有点怪异,为什么没有注册accpt事件啊,一般的我们的server socket都是要注册accpt事件到selector,用于监听连接。如果你发现了这个问题,说明你是理解socket的编程的,^_^。实际上是前面在分析bind的时候我们漏掉了一个重要的函数,initAndRegister,下面再来看看它的定义:

final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}

ChannelPromise regPromise = channel.newPromise();
group().register(channel, regPromise);
if (regPromise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
//    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
//    added to the event loop's task queue for later execution.
//    i.e. It's safe to attempt bind() or connect() now:
//         because bind() or connect() will be executed *after* the scheduled registration task is executed
//         because register(), bind(), and connect() are all bound to the same thread.

return regPromise;
}
在这里,我们看到了我们之前介绍event时说的register函数,它就是用于将Channel注册到eventloop中去的。eventloop经过层层调用,最终调用了SingleThreadEventLoop类中的register函数,下面是它的定义:

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}

channel.unsafe().register(this, promise);
return promise;
}
还是逃离不了unsafe对象的调用,前面说了那么多的unsafe,这个函数猜都可以猜测出执行过程了,这里就不细细的列举代码了。

还有一个init函数,这里需要说明一下,代码如下:

@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
它就是用来处理channel 的pipeline,并添加一个ServerBootstrapAcceptor的handler,继续看看这个handler的定义,我们就会明白它的意图。

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;

@SuppressWarnings("unchecked")
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child);
} catch (Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328 config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}
上面就是这个handler的全部代码,它重写了ChannelRead函数,它的目的其实是想将server端accept的channel注册到ChildGroup的eventloop中,这样就可以理解,服务端代码workerGroup这个eventloop的作用了,它终于在这里体现出了它的作用了。

3、总结

这篇文章主要是分析了serverbootstrap的全过程,通过对这个的分析,我们清晰的看到了平时编写socket服务端代码时对bind,register事件,以及accept channel等的处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: