Netty 4源码解析:服务端启动
2015-08-22 11:29
781 查看
Netty 4源码解析:服务端启动
1.基础知识
1.1 Netty 4示例
因为Netty 5还处于测试版,所以选择了目前比较稳定的Netty 4作为学习对象。而且5.0的变化也不像4.0这么大,好多网上的例子都已经过时了。<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.25.Final</version> </dependency>
Netty 4服务端的典型用法如下面代码示例所示,核心组件就是EventLoopGroup、ServerBootstrap、Handler等。其中像EventLoopGroup、Channel等都是可以灵活调配的。这里以比较常用的“主从Reactor”+Nio非阻塞为例,分析代码的执行流程。如果没有接触过Netty的话,建议先简单了解一下Reactor模型等知识再学习源码,不然可能会一头雾水。
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(port) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new XXXHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
1.2 NIO示例
不管用Netty还是其他网络框架,最终都绕不开JDK NIO提供的接口。那直接用NIO可以分为几步呢?Selector.open():创建当前平台的Selector。
ServerSocketChannel.open():创建服务端的Channel。
bind():绑定到某个端口上。
register():注册Channel和关注的事件到Selector上。
select():拿到已经就绪的事件。
下面就是一段NIO的示例代码,用单线程和一个Selector监控两个Channel的事件。
public static void main(String[] args) throws Exception { Selector selector = Selector.open(); int[] ports = { 1234, 5678 }; for (int port : ports) { ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress("localhost", port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); } while (true) { if (selector.select(3000) == 0) { System.out.print("."); continue; } Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(32)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buffer); // ... } keyIter.remove(); } } }
既然Netty也肯定使用NIO,那么下面分析代码流程时也着重看一下Netty是在哪、如何使用NIO的API。
2.EventLoopGroup预准备
在主流程开始之前,EventLoopGroup构造方法里做了一些预准备的工作。2.1 创建EventLoop组
NioEventLoopGroup继承自MultithreadEventLoopGroup和更上层的MultithreadEventExecutorGroup。其中,EventLoopGroup中指定使用的EventExecutor是NioEventLoop,而MultithreadEventLoopGroup指定了线程数(CPU数*2)和使用的线程工厂是DefaultThreadFactory。注意:SelectorProvider.provider()始终返回第一次调用创建的SelectorProvider,所以这里调用provider()与后面NioServerSocketChannel中再次调用并不冲突。
// NioEventLoopGroup public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } @Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); } // MultithreadEventLoopGroup private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); } protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } @Override protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY); }
利用这两个子类提供的信息,父类MultithreadEventExecutorGroup创建出NioEventLoop组和EventExecutorChooser。
// MultithreadEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { try { children[i] = newChild(threadFactory, args); } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } } }
2.2 EventLoop线程启动
NioEventLoop也是在构造方法中做了很多工作。它的父类SingleThreadEventExecutor会调用刚才NioEventLoopGroup中的线程工厂创建一个线程,并调用NioEventLoop覆写的run()方法。而run()方法中就是最为关键的事件循环代码,它对NioEventLoop构造方法创建的Selector不断的select()出就绪的事件。// SingleThreadEventExecutor protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { thread = threadFactory.newThread(new Runnable() { @Override public void run() { try { SingleThreadEventExecutor.this.run(); } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } } }); taskQueue = newTaskQueue(); } // NioEventLoop NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } return selector; } @Override protected void run() { for (;;) { try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); } } }
3.ServerBootstrap主流程
粗看前面的Netty 4代码示例,好像看不出哪里是框架的起点。实际上,当我们调用bind()方法时,这就是整个Netty框架的起点。具体来说,可以分为三步:创建Channel:创建NioServerSocketChannel以及底层NIO的Channel。
初始化Channel:初始化Channel和ChannelPipeline。
注册事件:绑定一个EventLoop到Channel上,并将Channel和关注的SelectionKey注册到Selector上。
绑定端口:绑定到某个监听端口上。
// AbstractBootstrap private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } } final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); } ChannelFuture regFuture = group().register(channel); return regFuture; }
3.1 创建Channel
ServerBoostrap根据我们传入channel()方法的NioServerSocketChannel.class,通过反射创建出Channel对象。注意:NioServerSocketChannel是Netty的包装类。真正的NIO Channel是在其构造方法中通过SelectorProvider创建的。这里Netty没有用之前我们的NIO示例代码中的ServerSocketChannel.open()方法创建Channel,而是使用SelectorProvider。注释里写道是为了避免多个Channel同时创建时open()方法中的竞争条件。
// AbstractBootstrap public B channel(Class<? extends C> channelClass) { return channelFactory(new BootstrapChannelFactory<C>(channelClass)); } private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> { @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } } // NioServerSocketChannel private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the SelectorProvider to open SocketChannel and so remove * condition in SelectorProvider#provider() which is called by * each ServerSocketChannel.open() otherwise. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
传入父类AbstractNioChannel的构造方法后,父类负责设置成了非阻塞模式。
// AbstractNioChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { throw new ChannelException("Failed to enter non-blocking mode.", e); } }
3.2 初始化Channel
创建完Channel后就可以为其做一些配置了。ServerBootstrap的init()方法会配置Channel的参数和属性,并创建ServerBootstrapAcceptor,它真正地持有workerGroup(childGroup)和我们定制的Handler。// ServerBootstrap @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
默认情况下,ChannelPipeline里只有head和tail两个默认的Handler,tail是InBoundHandler,head是OutBoundHandler。真正完成主从Reactor交互的自然就是这里加入到Pipeline的ServerBootstrapAcceptor。
// AbstractChannel protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); } // DefaultChannelPipeline public DefaultChannelPipeline(AbstractChannel channel) { this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
3.3 注册事件
完成了Channel和ChannelPipeline的初始化后,就要为Channel注册我们感兴趣的I/O事件了。尽管NIO的API很简单,但Netty中的注册流程还是比较复杂的:以bossGroup的NioEventLoopGroup.register(Channel)方法为源头
经过由Chooser选取出的NioEventLoop的register(Channel)
最终才委托给Channel的unsafe().register(EventLoop)
首先,NioEventLoopGroup.register()方法会使用next(),借助EventExecutorChooser从EventExecutor数组中选出一个NioEventLoop,并调用其register()方法。
// MultithreadEventExecutorGroup @Override public ChannelFuture register(Channel channel) { return next().register(channel); } // MultithreadEventExecutorGroup @Override public EventExecutor next() { return chooser.next(); } private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; } }
NioEventLoop继承自SingleThreadEventLoop,它的register()方法会调用NioServerSocketChannel的unsafe工具进行注册。
// SingleThreadEventLoop @Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { channel.unsafe().register(this, promise); return promise; }
AbstractChannel中Unsafe匿名类会将传入的NioEventLoop绑定到当前Channel,最终触发doRegister()子方法完成注册工作。同时在注册完成后,Netty会向ChannelPipeline中发送channelRegistered和channelActive通知,这就是我们获得到的Channel通知的源头。
// AbstractChannel.AbstractUnsafe protected abstract class AbstractUnsafe implements Unsafe { /** true if the channel has never been registered, false otherwise */ private boolean neverRegistered = true; @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } } private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; pipeline.fireChannelRegistered(); if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); } } }
最终终于到了真正实现注册的地方:AbstractNioChannel.doRegister()会将底层JDK的ServerSocketChannel注册到当前绑定的eventLoop持有的Selector上。
// AbstractNioChannel @Override protected void doRegister() throws Exception { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { // ... } }
3.4 端口绑定
绑定流程与注册类似,最终都是调用Channel的unsafe()工具类来完成。但区别是注册是从EventLoopGroup开始最终直接调用到Channel,而绑定是从Channel开始,经过了Pipeline中tail和head的处理才调用到Channel的。// AbstractBootstrap private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } }); } // AbstractChannelHandlerContext(TailContext) @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } return promise; } // DefaultChannelPipeline.HeadContext static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler { @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } } // AbstractChannel.AbstractUnsafe protected abstract class AbstractUnsafe implements Unsafe { @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { closeIfClosed(); return; } } } // NioServerSocketChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
4.总结梳理
至此,Netty服务就算是启动完毕,它已经开始监听端口上的请求了。现在就总结一下整个代码流程比较关键的地方。其实这一大片代码看下来,会发现ServerBootstrap和EventLoopGroup都是在互相配合,真正的核心是它们创建出NioEventLoop组和NioServerSocketChannel。每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。相关文章推荐
- R语言之因子分析
- C#实现Winform中打开网页页面的方法
- 关于NSuserdefault存储数据以及存储可变数据时的一些问题
- 黑马程序员-java-多线程
- hdu 1394 Minimum Inversion Number(线段树)
- 每天一个linux命令 : top命令
- Java--排序与查找
- 通过匿名内部类实现对文件的过滤
- JavaScript Array
- DSO Framer _ WinForm 使用
- 汇编语言相关图书推荐
- 前端精选文摘:BFC 神奇背后的原理
- 初入android驱动开发之字符设备(四-中断)
- 叶存菜鸟URL和URLConnection java笔记
- 链队列
- (二)、Android ListView滑动过程中图片显示重复错位闪烁问题解决
- man 命令 的使用方法
- 面试题总结(一)、TCP协议
- opencv中convexHull函数说明
- 把Nodepad++添加进右键菜单