Netty源码分析之客户端启动过程
2016-08-22 20:28
1326 查看
一、先来看一下客户端示例代码。
二、启动过程分析
由于客户端Bootstrap的配置过程和服务端ServerBootstrap配置过程原理相类似,此处不再单独讲解客户端的配置过程。接下来直接看客户端的connect过程。
三、connect过程分析
ChannelFuture f = b.connect(host, port).sync();
connect代码如下:
继续深入
继续查看doConnect源码
看一下initAndRegister代码
首先看一下针对客户端的init代码。
接下来看register过程,这个和服务端是一样的。(ChannelFuture regFuture = group().register(channel);)
继续看register0代码
看doRegister代码
initAndRegister执行完成之后,继续看doConnect0代码
继续看connect代码,简单的调用了pipeline.connect
从tail开始
最终会调用到head.connect()
客户端的isActive()
服务端的isActive()
看一下doConnect代码
[b]四、看一下如何获取异步连接结果的[/b]
在NioEventLoop的循环中,可以看到如下代码:
当发生OP_CONNECT事件时,最终会调用unsafe.finishConnect,代码如下
判断JDK的SocketChannel连接结果,返回true表示连接成功
fulfillConnectPromise会出发链接激活事件
五、write过程
由于在服务端启动过程中已经多次分析了channel的read执行过程,因此在这里单独分析一下channel的write过程。首先看一下channe接口中关于write方法的定义:
其在AbstractChannel中的实现为:
继续深入
事件进入pipeline之后,会从tail context开始向前传播(因为write是个outbound事件)
继续
继续
看一下findContextOutbound的实现
找到下一个OutBound类型的Context之后,会调用Context中的Handler
继续看handler的write实现
可以看到,默认的实现是将事件继续沿pipeline向前传播,最终会传到head Context
unsafe会执行真正的IO操作
可以看到,unsafe的write操作并不是真正的将数据发送出去,而是在环形缓冲区中进行缓存。当channel调用flush时,最终会执行unsafe的flush
addFlush仅仅是对之前缓存的Message进行标记
接下来看一下真正的flush操作
doWrite执行真正的写操作
public class NettyClientTest { public void connect(int port, String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup();//与服务端不同,客户端只需要一个IO线程组 try { Bootstrap b = new Bootstrap(); b.group(group) .option(ChannelOption.TCP_NODELAY, true)//禁用nagel算法 .channel(NioSocketChannel.class)//设置channel类型为NioSocketChannel .handler(new ChannelInitializer<SocketChannel>() {//为channel设置初始化Handler @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync();//等不等待连接结束 f.channel().closeFuture().sync();//同步等待关闭 }finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ int port = 8082; new NettyClientTest().connect(port,"127.0.0.1"); } } class EchoClientHandler extends ChannelInboundHandlerAdapter{ private int count = 0; static final String ECHO_REQ = "HI , MY NAME IS CHENYANG.$_"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i = 0;i < 10;i++){ ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("This is"+ ++count + "times receive server:[" + msg + "]"); ctx.writeAndFlush(Unpooled.copiedBuffer("hehe.$_".getBytes())); ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
二、启动过程分析
由于客户端Bootstrap的配置过程和服务端ServerBootstrap配置过程原理相类似,此处不再单独讲解客户端的配置过程。接下来直接看客户端的connect过程。
三、connect过程分析
ChannelFuture f = b.connect(host, port).sync();
connect代码如下:
/** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(String inetHost, int inetPort) { return connect(new InetSocketAddress(inetHost, inetPort)); }
继续深入
/** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doConnect(remoteAddress, localAddress()); }
继续查看doConnect源码
/** * @see {@link #connect()} */ private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//与服务端的类似,负责初始化和注册这个channel final Channel channel = regFuture.channel();//获得创建的channel if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//连接 } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } return promise; }
看一下initAndRegister代码
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel();//调用之前设置的channel工厂,创建channel,此处就是NioSocketChannel try { init(channel);//初始化这个channel,这个针对客户端和服务端是不同的 } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel);//向NioEventLoopGroup中注册这个channel if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
首先看一下针对客户端的init代码。
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler());//设置用户添加的handler,也就是初始化的handler final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { for (Entry<ChannelOption<?>, Object> e: options.entrySet()) { try { if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {//设置channel的配置选项 logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + channel, t); } } } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());//设置channel的属性 } } }
接下来看register过程,这个和服务端是一样的。(ChannelFuture regFuture = group().register(channel);)
@Override public ChannelFuture register(Channel channel) { return next().register(channel);//next()会在Group中选出下一个NioEventLoop }
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); }
@Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise);//unsafe中执行真正的注册操作 return promise; }
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop;//设置该channel绑定的eventloop if (eventLoop.inEventLoop()) {//必须保证在eventloop线程中执行 register0(promise);//注册 } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
继续看register0代码
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister();//在selector上注册 neverRegistered = false; registered = true;//设置已经注册标识 safeSetSuccess(promise);//设置注册成功 pipeline.fireChannelRegistered();//引发channelRegistered事件,这会导致初始化Handler的channelRegistered被调用 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) {//如果channel可用,针对客户端,也就是connect成功 pipeline.fireChannelActive();//引发channelActive事件,最终注册read事件 } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
看doRegister代码
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,这里注册的op为0,不会监听任何事件 return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
initAndRegister执行完成之后,继续看doConnect0代码
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() {//接下来的代码实在eventloop中执行,而不是用户线程 @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise);//执行connect } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
继续看connect代码,简单的调用了pipeline.connect
@Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
从tail开始
@Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
最终会调用到head.connect()
@Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
@Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new IllegalStateException("connection attempt already made"); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive);//设置promise } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis();//支持连接超时机制 if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
客户端的isActive()
@Override public boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected(); }
服务端的isActive()
@Override public boolean isActive() { return javaChannel().socket().isBound(); }
看一下doConnect代码
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress);//执行真正的异步connect if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT);//如果没有注册成功,就注册OP_CONNECT事件 } success = true; return connected; } finally { if (!success) { doClose(); } } }
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) {//如果connect成功 13 pipeline().fireChannelActive();//最终会注册read事件,细节如下 14 } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } }
@Override public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read();//pipeline.read()-->tail.read()-->****-->head.read()-->unsafe.beginRead()-->doBeginRead()-->real操作 } return this; }
[b]四、看一下如何获取异步连接结果的[/b]
在NioEventLoop的循环中,可以看到如下代码:
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
当发生OP_CONNECT事件时,最终会调用unsafe.finishConnect,代码如下
@Override public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop();//确保该操作是在eventLoop线程中的 try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
@Override protected void doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) {//判断JDK的SocketChannel连接结果,返回true表示连接成功 throw new Error(); } }
判断JDK的SocketChannel连接结果,返回true表示连接成功
public boolean finishConnect() throws IOException { Object var1 = this.readLock; synchronized(this.readLock) { Object var2 = this.writeLock; synchronized(this.writeLock) { Object var3 = this.stateLock; boolean var10000; synchronized(this.stateLock) { if(!this.isOpen()) { throw new ClosedChannelException(); } if(this.state == 2) { var10000 = true; return var10000; } if(this.state != 1) { throw new NoConnectionPendingException(); } } int var41 = 0; Object var4; try { label525: { boolean var29 = false; boolean var6; label506: { try { var29 = true; this.begin(); synchronized(this.blockingLock()) { label480: { label494: { Object var5 = this.stateLock; synchronized(this.stateLock) { if(!this.isOpen()) { var6 = false; break label494; } this.readerThread = NativeThread.current(); } if(!this.isBlocking()) { do { var41 = checkConnect(this.fd, false, this.readyToConnect); } while(var41 == -3 && this.isOpen()); } else { do { while(true) { var41 = checkConnect(this.fd, true, this.readyToConnect); if(var41 == 0) { continue; } break; } } while(var41 == -3 && this.isOpen()); } var29 = false; break label480; } var29 = false; break label506; } } } finally { if(var29) { Object var13 = this.stateLock; synchronized(this.stateLock) { this.readerThread = 0L; if(this.state == 3) { this.kill(); var41 = 0; } } this.end(var41 > 0 || var41 == -2); assert IOStatus.check(var41); } } var4 = this.stateLock; synchronized(this.stateLock) { this.readerThread = 0L; if(this.state == 3) { this.kill(); var41 = 0; } } this.end(var41 > 0 || var41 == -2); assert IOStatus.check(var41); break label525; } Object var7 = this.stateLock; synchronized(this.stateLock) { this.readerThread = 0L; if(this.state == 3) { this.kill(); var41 = 0; } } this.end(var41 > 0 || var41 == -2); assert IOStatus.check(var41); return var6; } } catch (IOException var38) { this.close(); throw var38; } if(var41 > 0) { var4 = this.stateLock; synchronized(this.stateLock) { this.state = 2; if(this.isOpen()) { this.localAddress = Net.localAddress(this.fd); } } var10000 = true; return var10000; } else { var10000 = false; return var10000; } } } }
fulfillConnectPromise会出发链接激活事件
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) { pipeline().fireChannelActive();//参照前面的说明 } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } }
五、write过程
由于在服务端启动过程中已经多次分析了channel的read执行过程,因此在这里单独分析一下channel的write过程。首先看一下channe接口中关于write方法的定义:
/** * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. */ ChannelFuture write(Object msg);
其在AbstractChannel中的实现为:
@Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }
继续深入
@Override public ChannelFuture write(Object msg) { return tail.write(msg); }
事件进入pipeline之后,会从tail context开始向前传播(因为write是个outbound事件)
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
继续
@Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } write(msg, false, promise);//false表示不flush缓冲区的意思 return promise; }
继续
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } else { int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { buffer.incrementPendingOutboundBytes(size); } } Runnable task; if (flush) { task = WriteAndFlushTask.newInstance(next, msg, size, promise); } else { task = WriteTask.newInstance(next, msg, size, promise); } safeExecute(executor, task, promise, msg); } }
看一下findContextOutbound的实现
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
找到下一个OutBound类型的Context之后,会调用Context中的Handler
private void invokeWrite(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
继续看handler的write实现
/** * Calls {@link ChannelHandlerContext#write(Object)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
可以看到,默认的实现是将事件继续沿pipeline向前传播,最终会传到head Context
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
unsafe会执行真正的IO操作
@Override public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
可以看到,unsafe的write操作并不是真正的将数据发送出去,而是在环形缓冲区中进行缓存。当channel调用flush时,最终会执行unsafe的flush
@Override public final void flush() { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
addFlush仅仅是对之前缓存的Message进行标记
/** * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed * and so you will be able to handle them. */ public void addFlush() { // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
接下来看一下真正的flush操作
protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { boolean close = t instanceof IOException && config().isAutoClose(); // We do not want to trigger channelWritabilityChanged event if the channel is going to be closed. outboundBuffer.failFlushed(t, !close); if (close) { close(voidPromise()); } } finally { inFlush0 = false; } }
doWrite执行真正的写操作
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } }
protected final void clearOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { in.remove(); continue; } boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = region.transfered() >= region.count(); boolean setOpWrite = false; if (!done) { long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i--) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true; break; } } in.progress(flushedAmount); } if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break; } } else { // Should not reach here. throw new Error(); } } }
@Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
/** * Notify the {@link ChannelPromise} of the current message about writing progress. */ public void progress(long amount) { Entry e = flushedEntry; assert e != null; ChannelPromise p = e.promise; if (p instanceof ChannelProgressivePromise) { long progress = e.progress + amount; e.progress = progress; ((ChannelProgressivePromise) p).tryProgress(progress, e.total); } }
@Override public boolean tryProgress(long progress, long total) { if (total < 0) { total = -1; if (progress < 0 || isDone()) { return false; } } else if (progress < 0 || progress > total || isDone()) { return false; } notifyProgressiveListeners(progress, total); return true; }
相关文章推荐
- 【Netty源码分析】客户端connect服务端过程
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- Netty源码分析之Bootstrap启动过程分析
- Netty5源码分析--2.客户端启动过程
- 【Netty源码分析】客户端connect服务端过程
- Netty源码分析:服务端启动全过程(篇幅很长)
- Netty源码分析之服务端启动过程
- 【Netty源码分析】客户端connect服务端过程
- Netty系列-客户端启动源码分析
- nginx源码分析(10)-启动过程分析
- Nginx源码分析---Nginx启动初始化过程(一)
- nginx源码分析(1)——启动过程
- u-boot启动过程分析(源码)
- Linux内核源码分析--内核启动命令行的传递过程(Linux-3.0 ARMv7)
- android 4.04的应用程序启动过程及与Zygote的交互(基于静态源码分析)
- Nginx源码分析---Nginx启动初始化过程(二)
- Amoeba源码分析一:启动过程分析
- Nginx源码分析-启动初始化过程(二)
- Hbase 源码分析5--Master启动过程
- nginx源码分析(10)-启动过程分析