Netty的ChannelFuture和ChannelPromise
2017-12-31 21:26
399 查看
在netty中可以通过channelFuture和channelPromise来实现异步操作。,可以通过官方给出的channelFuture的注释来看到关于channelFuture的状态变化。
* <pre>
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = <b>true</b> |
* +--------------------------+ | | isSuccess() = <b>true</b> |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = <b>false</b> | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = <b>true</b> |
* | isCancelled() = false | | | cause() = <b>non-null</b> |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = <b>true</b> |
* | isCancelled() = <b>true</b> |
* +---------------------------+
* </pre>
可以由此看到,channelFuture分为了四个状态,初始状态为uncompleted,isDone()所返回的状态为false,isSuccess()返回的状态为true,isCancelled()返回的状态为false,cause()所返回的异常为null。
接下来可以看到,可以改变为完成成功,完成失败,完成中被取消三个状态。
从上面也可以看到三个状态的改变也会改变channelFuture的三个状态量。首先不管状态如何改变,只要从未完成状态改变,isDone()都会变为true,然后成功isSuccess()会是true,被取消则isCancelled()则会变成true,异常发生则会cause()返回相应的异常类型。
以上就是channelFuture的基本,而channelPromise作为channelFuture的扩展,在实际作用中发挥上面的功能。
可以从channel的注册看到future的使用的例子。
在注册的步骤达到SingleThreadEventLoop时,则会根据要注册的channel生成相应的channelPromise。
这里的构造方法也表明了channelPromise主要需要的两个成员,毫无疑问,一个就是所对应的需要作出异步处理的channel还有就是这个channel所对应的eve
b65f
ntloop。
在注册的过程中,往下面可以看出如果之前的evnetloop所对应的线程与当前的线程不一样的话,真正的注册流程则会作为task任务放入eventloop的阻塞队列异步进行。此时,整个注册操作就会异步进行,那么在注册完毕后,如果需要该channel用来connect或者bind的时候,怎么保证channel的注册状态呢,这个时候channelPromise起到了作用。看到absactUnsafe的register0()方法,值得一提的是一般情况下这个方法已经是异步进行当中的了。
doRegister()方法作为具体的注册业务逻辑,在这之后则会看到safeSetSuccess()方法,而在catch块中也可以看到safeSetFailure()方法,顾名思义,这里的状态变迁可以很清楚的联想到channelFuture那张图的变化。
只有在注册完成之后才会调用的safeSetSuccess()方法表明了对注册状态的成功的标记,可以看到这个方法,这个方法调用了channelPromise的trySuccess()方法。
而在channelPromise的trySuccess()方法中,会调用setSuccess0()方法来对状态进行标记。
由于是异步的情况下,需要保证这里的线程安全,如果这时isDone()表明还未完成,则会把状态量result标位success。
而异常的状态标记也与成功类似。
在之后,准备给channel去connect或者bind的时候,就会判断channelPromise的isDone()来选择异步放进eventloop的阻塞队列还是直接进行下面的操作。
但是,如果在需要promise完成时,真正的注册还没有完成,怎么设置在异步等待完成时候的代码逻辑?这个时候就需要listener。
看到abstractBootstrap的doBInd()方法,在完成注册步骤之后,要采用doBind0()来继续绑定的逻辑,之前,先会调用isDone()来判断是否已经将注册完成,如果没有,则会给channelPromise添加一个监听器,实现operationComplete()方法,里面实现了如果前面步骤已经结束,则会调用doBind0()方法,当然,也可以看到,在之前的步骤中,已经把注册时候的channelPromise设置为已经完成,这个时候就会生成一个新的promise,来保证接下里的异步作用。
那么是何时通知监听器的呢?
现在可以看DefaultPromsie中完整的trySuccess()方法。
在通过setSuccess0()方法完成状态量的设置之后,就会通过notifyListeners()方法来通知相应的监听器去执行相应的operationComplete()方法。值得一提的是,如果在添加listener的时候会判断isDone(),如果此时已经完成了相应的异步操作,则会直接调用监听器的方法。
* <pre>
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = <b>true</b> |
* +--------------------------+ | | isSuccess() = <b>true</b> |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = <b>false</b> | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = <b>true</b> |
* | isCancelled() = false | | | cause() = <b>non-null</b> |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = <b>true</b> |
* | isCancelled() = <b>true</b> |
* +---------------------------+
* </pre>
可以由此看到,channelFuture分为了四个状态,初始状态为uncompleted,isDone()所返回的状态为false,isSuccess()返回的状态为true,isCancelled()返回的状态为false,cause()所返回的异常为null。
接下来可以看到,可以改变为完成成功,完成失败,完成中被取消三个状态。
从上面也可以看到三个状态的改变也会改变channelFuture的三个状态量。首先不管状态如何改变,只要从未完成状态改变,isDone()都会变为true,然后成功isSuccess()会是true,被取消则isCancelled()则会变成true,异常发生则会cause()返回相应的异常类型。
以上就是channelFuture的基本,而channelPromise作为channelFuture的扩展,在实际作用中发挥上面的功能。
可以从channel的注册看到future的使用的例子。
在注册的步骤达到SingleThreadEventLoop时,则会根据要注册的channel生成相应的channelPromise。
public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); }
这里的构造方法也表明了channelPromise主要需要的两个成员,毫无疑问,一个就是所对应的需要作出异步处理的channel还有就是这个channel所对应的eve
b65f
ntloop。
在注册的过程中,往下面可以看出如果之前的evnetloop所对应的线程与当前的线程不一样的话,真正的注册流程则会作为task任务放入eventloop的阻塞队列异步进行。此时,整个注册操作就会异步进行,那么在注册完毕后,如果需要该channel用来connect或者bind的时候,怎么保证channel的注册状态呢,这个时候channelPromise起到了作用。看到absactUnsafe的register0()方法,值得一提的是一般情况下这个方法已经是异步进行当中的了。
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
doRegister()方法作为具体的注册业务逻辑,在这之后则会看到safeSetSuccess()方法,而在catch块中也可以看到safeSetFailure()方法,顾名思义,这里的状态变迁可以很清楚的联想到channelFuture那张图的变化。
只有在注册完成之后才会调用的safeSetSuccess()方法表明了对注册状态的成功的标记,可以看到这个方法,这个方法调用了channelPromise的trySuccess()方法。
而在channelPromise的trySuccess()方法中,会调用setSuccess0()方法来对状态进行标记。
private boolean setSuccess0(V result) { if (isDone()) { return false; } synchronized (this) { // Allow only once. if (isDone()) { return false; } if (result == null) { this.result = SUCCESS; } else { this.result = result; } if (hasWaiters()) { notifyAll(); } } return true; }
由于是异步的情况下,需要保证这里的线程安全,如果这时isDone()表明还未完成,则会把状态量result标位success。
而异常的状态标记也与成功类似。
在之后,准备给channel去connect或者bind的时候,就会判断channelPromise的isDone()来选择异步放进eventloop的阻塞队列还是直接进行下面的操作。
但是,如果在需要promise完成时,真正的注册还没有完成,怎么设置在异步等待完成时候的代码逻辑?这个时候就需要listener。
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and succesful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
看到abstractBootstrap的doBInd()方法,在完成注册步骤之后,要采用doBind0()来继续绑定的逻辑,之前,先会调用isDone()来判断是否已经将注册完成,如果没有,则会给channelPromise添加一个监听器,实现operationComplete()方法,里面实现了如果前面步骤已经结束,则会调用doBind0()方法,当然,也可以看到,在之前的步骤中,已经把注册时候的channelPromise设置为已经完成,这个时候就会生成一个新的promise,来保证接下里的异步作用。
那么是何时通知监听器的呢?
现在可以看DefaultPromsie中完整的trySuccess()方法。
public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }
在通过setSuccess0()方法完成状态量的设置之后,就会通过notifyListeners()方法来通知相应的监听器去执行相应的operationComplete()方法。值得一提的是,如果在添加listener的时候会判断isDone(),如果此时已经完成了相应的异步操作,则会直接调用监听器的方法。
相关文章推荐
- Netty学习笔记(二) Channel和ChannelFuture
- Netty源码之ChannelPromise回调通知问题
- LocalChannel in Netty
- netty源码分析之ChannelFuture
- [netty核心类]--Channel和Unsafe类
- netty的channel配置参数详解
- Netty 快速入门系列 - Chapter 3 Netty5.x【第七讲】 - Channel线程安全?
- Netty学习之旅------写事件处理NioSocketChannel、ChannelOutbondBuffer源码分析
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty In Action中文版 - 第八章:附带的ChannelHandler和Codec
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- netty之SimpleChannelInboundHandler
- Netty利用ChannelGroup广播消息
- Netty:Channel
- Netty(二)ChannelPipeline和ChannelHandler
- Elasticsearch class: org/jboss/netty/channel/socket/nio/NioWorkerPool
- Netty学习四:Channel
- java netty之ChannelInitializer
- netty4.0.x源码分析—channel
- netty源码分析之ChannelPipeline