Netty4启动ServerBootStrap源码分析
2017-07-23 12:41
1526 查看
首记:
来分析下Netty4中的核心NIO模型的启动过程, 如何 bind -> accept -> process -> …. 这些过程,在分析之前,先来熟悉下jdk中原生的NIO模型,
因为Netty中的NIO是基于此上面进行封装的。
上面实现一个简单的Nio socket server, 用浏览器可访问
以上面例子说起, 开始初始化了两个线程组 accept, worker
用于 Nio server 的io事件处理线程, 顾名思义,accept 用于ServerSocketChannel accept io事件 ,worker 用于SocketChannel read,write读写io事件,其中线程组的每个线程都对应了唯一一个Selector,用于io事件,也就是线程(其实就是NioEventLoop 与 Selector 一一对应),等会儿可以简单的来看看,因为这不是这篇文章的重点,以后可以专门写篇文章来剖析一下Netty中有关于nio线程对象的封装。
现在来简单的看下
这是所有的构造方法, 如果没有指定线程数, 会由MultithreadEventLoopGroup 默认线程数指定
还可以指定 Executor,SelectorProvider 等等, 不再赘述。
正式进入ServerBootstrap的分析, 继承于抽象父类AbstractBootstrap
用于指定acceptor, worker io线程,
调用父类的方法如下:
指定Channel类型,
这个ReflectiveChannelFactory会在后面的初始化Channel的时候用到.
以上的都不是重点, 核心的部分来了,主要是解决 Netty 如何处理accept 以及之后的SocketChannel的初始化和读写过程。
重新回到initAndRegister()这个过程 ,完了之后,会调用抽象init()方法,有两种实现 ServerBootstrap(服务端) 和Bootstrap (客户端),这里主要是看服务端的过程:
到此就差不多结束,简单的过了一下,篇幅不够 ,要想详细了解其过程,还得看源码。
来分析下Netty4中的核心NIO模型的启动过程, 如何 bind -> accept -> process -> …. 这些过程,在分析之前,先来熟悉下jdk中原生的NIO模型,
因为Netty中的NIO是基于此上面进行封装的。
一,java nio 模型
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; class NioServer implements Runnable { private static final int DEFAULT_SEND_BUF_SIZE = 65535; private static final int DEFAULT_RECV_BUF_SIZE = 65535; private static final int DEFAULT_READ_TIMEOUT = 30000; private static final int DEFAULT_BACKLOG = 1024; private String host; private int port; private int sendBufSize; private int recvBufSize; //ms private int readTimeout; private ServerSocketChannel serverSocketChannel; private Selector selector; //server 正在启动 private AtomicBoolean isStartingUp = new AtomicBoolean(false); //server 运行中 private volatile boolean isRunning = false; //server 正在关闭 private AtomicBoolean isShutdownIng = new AtomicBoolean(false); //server 关闭完成 private volatile boolean isShutdownComplete = false; private InetSocketAddress serverLocalAddress; public NioServer(int port) { this(null, port); } public NioServer(String host, int port) { this(host, port, DEFAULT_SEND_BUF_SIZE, DEFAULT_RECV_BUF_SIZE, DEFAULT_READ_TIMEOUT); } public NioServer(String host, int port, int sendBufSize, int recvBufSize, int readTimeout) { this.host = host; if (port < 0) { throw new IllegalArgumentException("port must be positive"); } this.port = port; this.sendBufSize = sendBufSize > 0 ? sendBufSize : DEFAULT_SEND_BUF_SIZE; this.recvBufSize = recvBufSize > 0 ? recvBufSize : DEFAULT_RECV_BUF_SIZE; this.readTimeout = readTimeout > 0 ? readTimeout : DEFAULT_READ_TIMEOUT; } private void initServerSocket() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setSoTimeout(this.readTimeout); serverSocketChannel.socket().setReceiveBufferSize(this.recvBufSize); serverSocketChannel.socket().setReuseAddress(true); InetSocketAddress address; if (host != null) { address = new InetSocketAddress(host, port); } else { address = new InetSocketAddress(port); } serverSocketChannel.bind(address, DEFAULT_BACKLOG); this.serverLocalAddress = (InetSocketAddress) serverSocketChannel.getLocalAddress(); System.out.printf("******************** Bind Local Address [%s:%d] ************************\n", this.serverLocalAddress.getHostName(), this.serverLocalAddress.getPort()); this.serverSocketChannel = serverSocketChannel; } private void initSelector() throws IOException { this.selector = Selector.open(); } private void init() throws IOException { if (!isStartingUp.compareAndSet(false, true)) { return; } if (isRunning) { return; } initServerSocket(); initSelector(); this.serverSocketChannel.configureBlocking(false); this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT ); this.isRunning = true; this.isStartingUp.compareAndSet(true, false); } @Override public void run() { try { init(); } catch (IOException e) { e.printStackTrace(); } while (this.selector.isOpen()) { int select = 0; try { select = this.selector.select(500); } catch (IOException e) { e.printStackTrace(); } if (select > 0 && isRunning) { Set<SelectionKey> selectionKeys = this.selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } //accept if (key.isAcceptable()) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.socket().setSendBufferSize(this.sendBufSize); socketChannel.socket().setSoTimeout(this.readTimeout); socketChannel.socket().setReceiveBufferSize(this.recvBufSize); socketChannel.socket().setKeepAlive(true); socketChannel.socket().setReuseAddress(true); socketChannel.socket().setTcpNoDelay(true); socketChannel.configureBlocking(false); InetSocketAddress local = (InetSocketAddress) socketChannel.getLocalAddress(); InetSocketAddress remote = (InetSocketAddress) socketChannel.getRemoteAddress(); System.out.printf("Accept Channel : [%s:%d] -> [%s:%d]\n", remote.getHostName(), remote.getPort(), local.getHostName(), local.getPort()); socketChannel.register(this.selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); if (socketChannel.isOpen()) { try { ByteBuffer buffer = ByteBuffer.allocate(1024); int len = socketChannel.read(buffer); if (len > 0) { System.out.println(new String(buffer.array(), 0, len)); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "<html><body>Hello World!</body></html>"; socketChannel.write(ByteBuffer.wrap(httpResponse.getBytes())); } else { key.interestOps(key.interestOps() &~ SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); try { socketChannel.close(); key.cancel(); } catch (IOException e1) { e1.printStackTrace(); } } } } } } } } public void shutdown() { if (!this.isShutdownIng.compareAndSet(false, true)) { return; } if (this.isShutdownComplete) { return; } isRunning = false; try { this.serverSocketChannel.socket().close(); this.serverSocketChannel.close(); this.selector.wakeup(); this.selector.close(); } catch (IOException e) { e.printStackTrace(); System.out.println("Fail to shutdown server ... "); throw new RuntimeException(e); } this.isShutdownComplete = true; this.isShutdownIng.compareAndSet(true, false); System.out.printf("**************** shutdown server[%s:%d] successfully **************\n", this.serverLocalAddress.getHostName(), this.serverLocalAddress.getPort()); } } public class NioDemo { public static void main(String[] args) throws InterruptedException { NioServer server = new NioServer(8888); new Thread(server).start(); Thread.sleep(100000); server.shutdown(); } }
上面实现一个简单的Nio socket server, 用浏览器可访问
http://localhost:8888
二,Netty ServerBootstrap 源码分析
先用netty来简单实现上面例子的功能:import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.net.InetSocketAddress; public class NettyServerDemo { public static void main(String[] args) { EventLoopGroup accept = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(4); ServerBootstrap b = new ServerBootstrap(); b.group(accept, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_RCVBUF, 65535) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { int len = msg.readableBytes(); byte[] bytes = new byte[len]; msg.getBytes(0, bytes); System.out.println(new String(bytes)); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "<html><body>Hello World!</body></html>"; ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(httpResponse.getBytes("UTF8")); ctx.writeAndFlush(buffer); } }); } }) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_RCVBUF, 65535) .childOption(ChannelOption.SO_SNDBUF, 65535) .localAddress(8888); ChannelFuture channelFuture = b.bind().awaitUninterruptibly(); if (channelFuture.isSuccess()) { Channel channel = channelFuture.channel(); InetSocketAddress address = (InetSocketAddress) channel.localAddress(); System.out.printf("********* BIND LOCAL ADDRESS [%s:%d] SUCCESSFULLY ********\n", address.getHostName(), address.getPort()); } } }
以上面例子说起, 开始初始化了两个线程组 accept, worker
EventLoopGroup accept = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(4);
用于 Nio server 的io事件处理线程, 顾名思义,accept 用于ServerSocketChannel accept io事件 ,worker 用于SocketChannel read,write读写io事件,其中线程组的每个线程都对应了唯一一个Selector,用于io事件,也就是线程(其实就是NioEventLoop 与 Selector 一一对应),等会儿可以简单的来看看,因为这不是这篇文章的重点,以后可以专门写篇文章来剖析一下Netty中有关于nio线程对象的封装。
现在来简单的看下
NioEventLoopGroup的初始化过程, 不能详细展开 ,不然就写不完了。
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory, final RejectedExecutionHandler rejectedExecutionHandler) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); }
这是所有的构造方法, 如果没有指定线程数, 会由MultithreadEventLoopGroup 默认线程数指定
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
还可以指定 Executor,SelectorProvider 等等, 不再赘述。
正式进入ServerBootstrap的分析, 继承于抽象父类AbstractBootstrap
b.group(accept, worker)
用于指定acceptor, worker io线程,
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
调用父类的方法如下:
public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
指定Channel类型,
.channel(NioServerSocketChannel.class)
AbstractBootstrap.java public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
这个ReflectiveChannelFactory会在后面的初始化Channel的时候用到.
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_RCVBUF, 65535) .option(ChannelOption.SO_BACKLOG, 1024) 用于设置ServerSocketChannel属性
.childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_RCVBUF, 65535) .childOption(ChannelOption.SO_SNDBUF, 65535) 用于设置accept之后建立的SocketChannel属性
.handler(new LoggingHandler(LogLevel.INFO)) 在accept过程链路中加上的 日志handler 这里涉及到ChannelPipeline,ChannelHandler,ChannelHandlerContext这三者的关系,不是这篇的核心重点, 以后可以专门写篇文章详细剖析一下。
.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { int len = msg.readableBytes(); byte[] bytes = new byte[len]; msg.getBytes(0, bytes); System.out.println(new String(bytes)); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "<html><body>Hello World!</body></html>"; ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(httpResponse.getBytes("UTF8")); ctx.writeAndFlush(buffer); } }); } }) 在SocketChannel对应的Pipeline中添加业务handler,即对于请求的处理以及相应。
以上的都不是重点, 核心的部分来了,主要是解决 Netty 如何处理accept 以及之后的SocketChannel的初始化和读写过程。
ChannelFuture channelFuture = b.bind().awaitUninterruptibly(); bind方法就是 整个Netty启动的入口,来看看它经过了那些步骤: AbstractBootstrap.java public ChannelFuture bind() { validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { //这行代码就是如何初始化ServerSocketChannel 以及将其注册到accept 线程组 某个线程的Selector上的 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
1. Netty对于java中ServerSocketChannel的初始化,以及注册到seletor上的过程剖析
现在来重点分析下final ChannelFuture regFuture = initAndRegister();这个方法
final ChannelFuture initAndRegister() { Channel channel = null; try { //这行代码就是初始化ServerSocketChannle, 还记得上面channel和channelFactory的设置么? 就是用在这个地方,可以稍微看看 channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); } return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //注册channel 到accept线程组某个线程对应的selector上 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } ReflectiveChannelFactory.java 默认就是用的这个 public T newChannel() { try { //直接通过无参构造方法反射获得类实例 (NioServerSocketChannel.class) return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } NioServerSocketChannel.java 的初始化 public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { try { //获取java中的ServerSocketChannel return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } /** * Create a new instance using the given {@link SelectorProvider}. */ public NioServerSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } /** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { //调用父类的构造方法 设置interest ops SelectionKey.OP_ACCEPT super(null, channel, SelectionKey.OP_ACCEPT); //ServerScoketChannel的配置 等会儿会将上面的option的参数 设置进入javaChannel().socket()(也就是上面newSocket获得的) config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } AbstractNioChannel.java 设置interest ops SelectionKey.OP_ACCEPT 并且设置ch.configureBlocking(false); protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } server register to selector 过程 SingleThreadEventLoop.java public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //unsafe在channel初始化也就是new的时候就会构造一个对应的,代码在其父类AbstractChannel中 可以稍微看下 promise.channel().unsafe().register(this, promise); return promise; } unsafe 和pipeline 初始化,有兴趣的可以自己看下 protected AbstractChannel(Channel parent, ChannelId id) { this.parent = parent; this.id = id; unsafe = newUnsafe(); pipeline = newChannelPipeline(); } register 注册过程最终是在unsafe中完成的 看看 @Override protected AbstractNioUnsafe newUnsafe() { //NioServerSocketChannel 的unsafe是NioMessageUnsafe return new NioMessageUnsafe(); } 第一步 AbstractUnsafe.java public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //将accept中某个eventLoop绑定到该NioServerSocketChannel上,意味着 一个channel 只能对应一个线程, 而一个线程可以绑定多个channel, 这也就是一个pipeline中的handler只能由某一个线程来处理,也就是不存在多线程竞争的情况,Netty特性之一,体现在这。 AbstractChannel.this.eventLoop = eventLoop; //这里判断就是为了应对上面的特性而设的 只能由channel绑定的线程来处理 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //register doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } AbstractNioChannel.java protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //终于看到了java中熟悉的代码了 至此 server register accept to selector 过程全部结束 对与accept后 的SocketChannel的注册读写事件过程也是大同小异 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
重新回到initAndRegister()这个过程 ,完了之后,会调用抽象init()方法,有两种实现 ServerBootstrap(服务端) 和Bootstrap (客户端),这里主要是看服务端的过程:
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { //这里的set option 就是上面说的对于ServerSocketChannel属性的设置,有兴趣的可以自己看,比较简单,就不展开了 setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //这里很重要 会处理accept 进来的SocketChannel 通过 ServerBootstrapAcceptor这个hanadler p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } 先来看看 server处理SocketChannel的过程 ,等会儿再来看是如何进行accept过程并把获得的SocketChannel发送到该ServerBootstrapAcceptor handler中进行处理的 public void channelRead(ChannelHandlerContext ctx, Object msg) { //接受到的Channel其实是NioSocketChannel 里面封装有java nio中的SocketChannel accept之后获得的 final Channel child = (Channel) msg; //为NioSocketChannel设置netty启动时设置的childHanlder child.pipeline().addLast(childHandler); //为SocketChannel设置child option 属性 setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //向worker 线程组的某个线程的selector register注册io事件, 和上面注册accept io事件大同小异, 不再赘述 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } 接下来看看accept这个过程,因为上面ServerSocketChannel已经注册了accept事件,一旦有客户端accept就绪,就会触发此事件, 可以简单的来看看NioEventLoop中对于selector io 事件的处理过程, 不详细了 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //这里就是上面的NioServerSocketChannel 的NioMessageUnsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //当有客户端accept事件就绪的时候, unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } //unsafe.read(); accept处理过程 private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //RecvByteBufAllocator 读取输入数据存入的ByteBuf分配器 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //accept 入口 看doReadMessages NioServerSocketChannel的实现 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } } protected int doReadMessages(List<Object> buf) throws Exception { //accept 在此 这里很简单 方法里面主逻辑就是serverSocketChannel.accept() SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //把SocketChannel 封装进入NioSocketChannel 在通过上面的read方法 int size = readBuf.size(); //for (int i = 0; i < size; i ++) { //readPending = false; //调用下一个inbound的handler,直到到达ServerBootstrapAcceptor这个handler 中 调用channelRead方法 于是就完成了整个accept过程,还是有点小复杂的啊 //pipeline.fireChannelRead(readBuf.get(i)); //} buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
到此就差不多结束,简单的过了一下,篇幅不够 ,要想详细了解其过程,还得看源码。
相关文章推荐
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- nova-api源码分析(WSGI server的创建及启动)
- SRS(simple-rtmp-server)流媒体服务器源码分析--启动
- 注册中心 Eureka 源码解析 —— Eureka-Server 启动(二)之 EurekaBootStrap
- Netty源码分析之Bootstrap启动过程分析
- Appium Server 源码分析之启动运行Express http服务器
- OpenStack J版 Neutron-server服务加载与启动源码分析(一)
- UiAutomator系列——Appium Android Bootstrap源码分析之启动运行(009)
- 注册中心 Eureka 源码解析 —— Eureka-Server 启动(二)之 EurekaBootStrap
- Appium Server源码分析之作为Bootstrap客户端