Netty ServerBootstrap如何绑定端口
ServerBootstrap监听端口
接下来带他们通过源码去分析下ServerBootstrap是如何监听端口
源码分析
1. 先看一下启动demo
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync();
2. ServerBootstrap.bind(PORT)
首先从ServerBootstrap.bind(PORT)入手,开始看下他是如何去监听端口,完成Nio底层的一些封装。直接看其抽象类AbstractBootstrap的方法实现
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); // 初始化并且去注册channel final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
3. 我们先分析下initAndRegister()到底干了什么?
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); // 这边是ReflectiveChannelFactory类通过反射去创建我们初始化bootstrap设置的Channel,这里由于我们是服务端,那就是NioServerSocketChannel init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
4. ServerBootstrap类 init方法
分析一下这个init(channel)干了什么
void init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); // 设置channelOptions setAttributes(channel, newAttributesArray());// 设置Attributes ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs); // 往ChannelPipeline添加了一个ChannelInitializer,此时channelPipeline里结构为。Head-> ChannelInitializer -> Tail p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
总结一下 init大致就是设置一些配置属性以及添加了一个ChannelInitializer,这个ChannelInitializer看他的方法好像是设置一个ServerBootstrapAcceptor,具体哪里执行这个ChannelInitializer不清楚,带着疑惑我们继续往下看。
5. MultithreadEventLoopGroup类 register方法
回到步骤3我们看下这行代码
ChannelFuture regFuture = config().group().register(channel);
config().group() 这个代码就是通过ServerBootstrapConfig的group()方法去获取我们设置的NioEventLoopGroup(boss线程)
NioEventLoopGroup类的register方法在父类MultithreadEventLoopGroup中实现:
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
MultithreadEventLoopGroup 种next()返回的实例是 SingleThreadEventLoop,因此我们直接看SingleThreadEventLoop的registry方法,通过方法的调用链路最终找到下面这个方法:
@Deprecated @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); ObjectUtil.checkNotNull(channel, "channel"); channel.unsafe().register(this, promise); return promise; }
这下出来了一个新的东西 channel.unsafe(),我们先分析下这个东西返回的是什么,因为我们知道我们的channel是NioServerSocketChannel,所以我们直接去看NioServerSocketChannel的unsafe()方法:
AbstractNioChannel.unsafe() -> AbstractChannel的unsafe变量 -> AbstractNioMessageChannel.newUnsafe() 最终我们可以确定返回的是NioMessageUnsafe;
那我直接看NioMessageUnsafe的register方法,这个方法是在父类AbstractUnsafe中定义
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 将NioServerSocketChannel的eventLoop 绑定到 MultithreadEventLoopGroup的next()返回的eventLoop AbstractChannel.this.eventLoop = eventLoop; // 如果当前线程是eventLoop则直接执行 if (eventLoop.inEventLoop()) { register0(promise); } else { try { // 提交一个eventLoop任务,任务会在EventLoop线程启动后去之行,下面会讲EventLoop线程是如何启动的 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
继续看下AbstractUnsafe的register0方法,(此方法不是立马执行,而是等EventLoop线程启动之后,这边可以顺便分许下这个方法)
private void register0(ChannelPromise promise) { try { //代码省略 doRegister(); // 开始注册,由外部类AbstractChannel实现 } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } //代码省略 if (isActive()) { if (firstRegistration) { // 第一次注册通过处罚ChannelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // 设置感兴趣的事件 beginRead(); } } }
由上图我们找到doRegister方法在AbstractNioChannel中实现,AbstractChannel里仅仅是个空实现,
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 注册java的ServerSocketChannel到EventLoop的Selector上去,并且把当前的netty的channel绑定到java的attachment上去,第二次参数0代表不订阅事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
- 这边注册完成后但是没有完成Accept事件的注册,我们继续研究下是怎么完成Accept事件的注册,通过代码我们得知如果不是第一次注册直接调用AbstractChannel的beginRead()->AbstractNioChannel的doBeginRead(),然后完成注册,
- 第一次调用的话是通过PipeLine触发ChannelActive事件 ,然后调用HeadContext的channelActive方法,然后调用readIfIsAutoRead方法
// Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }
NioSeverSocketChannel在新建时候初始化到父类AbstractNioChannel是一个SelectionKey.OP_ACCEPT事件,因此这边完成的是连接事件的监听
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
到这里完成了nio ServerSocketChannel selector的注册,
6. EventLoop类 run方法
看到这里有同学有疑问,这个提交任务,但是没有看到哪里启动了EventLoop的线程?带着这个疑惑我们看下eventLoop的execute方法。
@Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); // 判断是不是当前EventLoop线程 addTask(task); // 提交job if (!inEventLoop) { //如果不是在EventLoop线程中 startThread(); // 这个是开启线程吗?下面我会给分析下这个代码 // 移除job的一些操作 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }
SingleThreadEventExecutor的startThread()这个方法是开启EventLoop的线程(如果线程没有启动的话)
private void startThread() { if (state == ST_NOT_STARTED) { // cas判断下避免多线程开启线程, if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // 开启当前的EventLoop doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
SingleThreadEventExecutor的doStartThread()方法
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // 其他代码省略。。。 SingleThreadEventExecutor.this.run(); // 其他代码省略。。。 } }); }
接下来我们直接看 SingleThreadEventExecutor.this.run()这个方法,其运行的是子类NioEventLoop类中的run方法:
@Override protected void run() { int selectCnt = 0; for (;;) { //代码省略 这里面大致就是处理IO事件 以及 自定义Job事件 } }
7. ServerBootstrap类 doBind0方法
通过下面我们可以看到,此时像EventLoop线程池中提交了一个Runnable,里面会调用channel.bind(localAddress, promise)去绑定端口
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 绑定ip端口逻辑 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
那我们直接来看下channel.bind(localAddress, promise)具体看了什么,因为是服务端我们知道channel是NioServerSocketChannel,那我们去这里面寻找答案,果然在里面找到了最关键的一个方法,调用了pipeline的bind方法。pipeline默认是DefaultChannelPipeline
@Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
我们继续往下看DefaultChannelPipeline的bind方法,调用了Tail节点的bind方法,然后往Head节点传播
@Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { // 调用tail节点的bind return tail.bind(localAddress, promise); }
tail的bind方法定义在其父类AbstractChannelHandlerContext中
@Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { ObjectUtil.checkNotNull(localAddress, "localAddress"); if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { // 提交job,最终会被EventLoop执行 safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null, false); } return promise; }
这时候我们发现又是提交了一个Runnable去调用下一个的invokeBind方法
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { // 这行实际调用的是Header节点中的bind(this, localAddress, promise) ((ChannelOutboundHandler) handler()).bind()(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } }
直接看HeadContext中的实现方法
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { // 这个unsafe是NioServerScoketChannel中产生的 unsafe.bind(localAddress, promise); }
分析代码在其方法里找到了AbstractUnsafe类最终调用的外部类(NioServerScoketChannel)doBind方法,我们得知道NioServerScoketChannel中肯定存在doBind方法的实现l类
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); // 代码省略 try { //核心代码出现, doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 代码省略 }
NioServerScoketChannel的doBind方法
@SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { //获取java的channel然后开始绑定端口 if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
综上自此完成端口的绑定
总结一下
根据以上源码分析,我们大致能够清晰看到Netty是如何去封装服务端的端口绑定,下面我们总结下主要流程
- 初始化netty channel,设置一些属性,初始化pipeline等操作
- 注册channel
- 绑定channel设置EventLoop
- 将初始化的java channel绑定到EventLoop的selector上
- 启动EventLoop的run方法,用于处理Io事件
- pipeline触发fireChannelActive注册Accept事件
- 执行bind方法,
结束
识别下方二维码!回复:
入群,扫码加入我们交流群!
- 使用Netty绑定一个端口如何分辨出多种类型的DTU的注册包
- 如何解决程序退出重启后不能绑定端口的问题?
- 【Netty源码学习】ServerBootStrap
- Netty源码学习-ServerBootstrap启动及事件处理过程
- java netty之ServerBootstrap的启动
- 如何以非 root 用户将应用绑定到 80 端口-ssh 篇 » 社区 » Ruby China
- 如何解决程序退出重启后不能绑定端口的问题?
- 【Netty源码学习】ServerBootStrap
- Sun Application Server如何设置debug端口
- netty源码分析(三)Netty服务端ServerBootstrap的初始化与反射在其中的应用分析
- netty源码学习一(Serverbootstrap引导程序)
- 如何通过DHCP Snooping配置来实现IP+MAC+端口绑定功能
- selenium server运行时候报错---“端口被占用”,Windows平台如何查看端口占用情况
- APACHE如何里一个站点绑定多个域名?用ServerAlias
- 配置ffserver 的rtsp的端口和服务绑定ip
- linux上客户端绑定固定端口请求server
- 如何编写绑定端口shellcode
- 如何解决飞秋FeiQ绑定端口错误
- 如何通过配置来实现IP+MAC+端口绑定功能
- Dubbo启动报错:io/netty/bootstrap/ServerBootstrap和 Fail to start server(url: dubbo://192.168.137.1:20880