Netty源码之ChannelPromise回调通知问题
2017-07-04 00:00
531 查看
摘要: 本文主要结合netty源码分析一个关于ChannelPromise的回调问题,其中内容涉及Netty线程模型和future机制。文中源码基于Netty4.1
先抛出写此文章的问题描述:
客户端调用Connect连接服务端时,在执行doResolveAndConnect期间,当regFuture的isDone返回false时,Netty会在regFuture中注册监听器用于Channel初始化和注册成功之后回调,并且新创建了一个PendingRegistrationPromise的实例,那么为什么需要创建一个PendingRegistrationPromise的实例呢?作用是什么?下文将重点解答此问题。
回顾下connect过程的方法调用链:
connect->doResolveAndConnect->initAndRegister->SingleThreadEventLoop.register->AbstractUnsafe.register->AbstractUnsafe.register0->doResolveAndConnect0 ......
首先,对启动客户端的代码进行改造:
意图很简单,connect完成后回调,打印出执行operationComplete的线程和future的类型。
同时,在方法doResolveAndConnect、initAndRegister、SingleThreadEventLoop.register、AbstractUnsafe.register和AbstractUnsafe.register0中加入代码,打印当前线程:
正常启动客户端,查看输出:
输出结果很简单,connect过程中,将会在main线程中执行doResolveAndConnect、initAndRegister、SingleThreadEventLoop.register、AbstractUnsafe.register,直到调用AbstractUnsafe.register0方法注册当前Channel到I/O线程的多路复用器时,register0将会进入I/O线程执行,这点由AbstractUnsafe.register源码可知道:
这段代码的含义是,如果当前线程是当前Channel的I/O线程,直接调用register0;否则,封装register0在Runnable中提交到任务队列交由当前Channel的I/O线程执行。同时,由main方法中加入的监听器回调方法输出可知,客户端ChannelFuture中注册的监听者由I/O线程调用operationComplete进行通知,其中future对象的类型为DefaultChannelPromise,这点由SingleThreadEventLoop.register源码可知:
register方法使用当前Channel对象和当前NioEventLoop对象来构造DefaultChannelPromise的实例,因此DefaultChannelPromise实例将使用该NioEventLoop来作为通知监听者的executor。
那么doResolveAndConnect何时会进入else代码块呢?
由于main线程在AbstractUnsafe.register中会将register0交给I/O线程执行,main线程将会立即返回,进入doResolveAndConnect中检测从initAndRegister返回的regFuture对象,即DefaultChannelPromise的实例,若regFuture的isDone返回false,表明I/O线程还在执行register0的过程中,任务未完成;那么main线程将会进入else代码块,创建PendingRegistrationPromise实例,同时在regFuture上注册监听器,待I/O线程完成任务后通知,并将PendingRegistrationPromise实例返回客户端。
下面分析创建并返回PendingRegistrationPromise实例的作用。
首先修改监听器的回调方法operationComplete,打印出当前线程:
同时,修改源码,让I/O线程在执行register0过程中睡眠1s:
重新启动客户端,查看输出:
由输出可知,I/O线程完成之后,将会通知doResolveAndConnect中注册regFuture上的监听者,并负责回调operationComplete方法。这里regFuture类型为DefaultChannelPromise,也就是register中创建的对象。而doResolveAndConnect返回的是PendingRegistrationPromise的实例,由main方法中注册的监听器operationComplete输出可知,doResolveAndConnect返回的是PendingRegistrationPromise的实例。此外,main方法中监听器的operationComplete由I/O线程负责调用,这点后面会解释。
现在修改register0源码,在其中抛出异常:
启动客户端,查看输出:
这里输出省略了异常堆栈,由输出可知,main方法中注册的监听器回调方法输出发生了变化,回调监听器方法operationComplete的线程变了,变成了GlobalEventExecutor。查看PendingRegistrationPromise源码:
由PendingRegistrationPromise源码可知,其继承自DefaultChannelPromise ,同时改写了executor()方法,executor()用于返回与当前 promise相关联的的执行器,通常是I/O线程,promise将通过该执行器通知注册在该promise上的监听者。这里改写了executor(),只有当registered为true时才会使用I/O线程,否则使用默认的GlobalEventExecutor。当构造PendingRegistrationPromise实例时,默认会初始化registered为false,只有调用promise.registered()后才会设置为True。
由于I/O线程调用register0发生了异常,在调用doResolveAndConnect中operationComplete方法时,cause != null成立,promise.registered()不会被调用,因此main方法中监听器的operationComplete由PendingRegistrationPromise的GlobalEventExecutor执行器负责调用。当I/O线程执行register0正常完成时,promise.registered()被调用,executor()返回的执行器即为当前Channel的EventLoop。
由此说明,在Channel注册未完成的情况下,创建PendingRegistrationPromise实例的作用是待I/O线程调用register0正常完成之后通知main方法中注册的监听者,同时,当I/O线程抛出异常时,使用默认的GlobalEventExecutor通知。I/O线程调用register0成功返回时,需要切换为使用I/O线程发送通知,不能使用默认的GlobalEventExecutor,Netty源码提供了关于这个bug的说明,链接如下:
那么为什么要用PendingRegistrationPromise的实例,而不直接使用regFuture呢?
当进入doResolveAndConnect的else代码块时,regFuture注册完监听器之后,doResolveAndConnect会立马返回,main线程等待回调;当I/O线程执行完register0后,将会回调regFuture上监听者的operationComplete方法,若doResolveAndConnect的else代码块直接返回regFuture时,I/O线程也会负责回调main方法中的operationComplete,包括解析并创建连接都将由I/O线程来完成。
那么问题来了,当I/O线程先执行main方法中的operationComplete方法时,doResolveAndConnect0中的连接操作可能还未被执行,如果connection还未建立, f.channel().closeFuture().sync()将不会阻塞直到connection关闭,而是马上返回,main线程将会退出。main线程都退出了,I/O线程线程啥的不就直接Over啦。因此使用PendingRegistrationPromise的实例向main线程发送通知,当register0失败时,promise负责传递regFuture中的异常到main线程;当register0成功时,promise需要等待doResolveAndConnect0完成后再回调main方法中监听器的operationComplete。关于这一点,doResolveAndConnect中就算进入if代码块,也会调用channel.newPromise()创建一个新的promise实例通知客户端。
看示例,修改源码,doResolveAndConnect的else代码块中不返回PendingRegistrationPromise实例,而是返回regFuture,去掉register0中抛出异常的代码:
启动客户端,输出:
由于服务端未启动,正常情况下应该抛出异常,而现在直接main方法退出。
本文中的内容涉及Channel的初始化过程,Netty的线程模型和Future机制,有问题可随时与笔者联系。
欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/1154326
先抛出写此文章的问题描述:
客户端调用Connect连接服务端时,在执行doResolveAndConnect期间,当regFuture的isDone返回false时,Netty会在regFuture中注册监听器用于Channel初始化和注册成功之后回调,并且新创建了一个PendingRegistrationPromise的实例,那么为什么需要创建一个PendingRegistrationPromise的实例呢?作用是什么?下文将重点解答此问题。
回顾下connect过程的方法调用链:
connect->doResolveAndConnect->initAndRegister->SingleThreadEventLoop.register->AbstractUnsafe.register->AbstractUnsafe.register0->doResolveAndConnect0 ......
首先,对启动客户端的代码进行改造:
ChannelFuture f = b.connect(HOST, PORT); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println(Thread.currentThread()); System.out.println(future.getClass()); } }); f.sync();
意图很简单,connect完成后回调,打印出执行operationComplete的线程和future的类型。
同时,在方法doResolveAndConnect、initAndRegister、SingleThreadEventLoop.register、AbstractUnsafe.register和AbstractUnsafe.register0中加入代码,打印当前线程:
System.out.println(Thread.currentThread());
正常启动客户端,查看输出:
---------------doResolveAndConnect-----------------Thread[main,5,main] ---------------initAndRegister-----------------Thread[main,5,main] ---------------SingleThreadEventLoop.register-----------------Thread[main,5,main] ---------------AbstractUnsafe.register-----------------Thread[main,5,main] ---------------AbstractUnsafe.register0-----------------Thread[nioEventLoopGroup-2-1,10,main] Thread[nioEventLoopGroup-2-1,10,main] class io.netty.channel.DefaultChannelPromise
输出结果很简单,connect过程中,将会在main线程中执行doResolveAndConnect、initAndRegister、SingleThreadEventLoop.register、AbstractUnsafe.register,直到调用AbstractUnsafe.register0方法注册当前Channel到I/O线程的多路复用器时,register0将会进入I/O线程执行,这点由AbstractUnsafe.register源码可知道:
...... if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ...... } } ......
这段代码的含义是,如果当前线程是当前Channel的I/O线程,直接调用register0;否则,封装register0在Runnable中提交到任务队列交由当前Channel的I/O线程执行。同时,由main方法中加入的监听器回调方法输出可知,客户端ChannelFuture中注册的监听者由I/O线程调用operationComplete进行通知,其中future对象的类型为DefaultChannelPromise,这点由SingleThreadEventLoop.register源码可知:
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
register方法使用当前Channel对象和当前NioEventLoop对象来构造DefaultChannelPromise的实例,因此DefaultChannelPromise实例将使用该NioEventLoop来作为通知监听者的executor。
那么doResolveAndConnect何时会进入else代码块呢?
由于main线程在AbstractUnsafe.register中会将register0交给I/O线程执行,main线程将会立即返回,进入doResolveAndConnect中检测从initAndRegister返回的regFuture对象,即DefaultChannelPromise的实例,若regFuture的isDone返回false,表明I/O线程还在执行register0的过程中,任务未完成;那么main线程将会进入else代码块,创建PendingRegistrationPromise实例,同时在regFuture上注册监听器,待I/O线程完成任务后通知,并将PendingRegistrationPromise实例返回客户端。
下面分析创建并返回PendingRegistrationPromise实例的作用。
首先修改监听器的回调方法operationComplete,打印出当前线程:
regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); System.out.println("---------------operationComplete-----------------"+future.getClass()); System.out.println("---------------operationComplete-----------------"+Thread.currentThread()); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } });
同时,修改源码,让I/O线程在执行register0过程中睡眠1s:
private void register0(ChannelPromise promise) { try { ...... System.out.println("---------------AbstractUnsafe.register0---------"+Thread.currentThread()); Thread.sleep(1000); safeSetSuccess(promise); pipeline.fireChannelRegistered(); ...... } catch (Throwable t) { ...... } }
重新启动客户端,查看输出:
---------------doResolveAndConnect-----------------Thread[main,5,main] ---------------initAndRegister-----------------Thread[main,5,main] ---------------SingleThreadEventLoop.register-----------------Thread[main,5,main] ---------------AbstractUnsafe.register-----------------Thread[main,5,main] ---------------AbstractUnsafe.register0-----------------Thread[nioEventLoopGroup-2-1,10,main] ---------------operationComplete-----------------class io.netty.channel.DefaultChannelPromise ---------------operationComplete-----------------Thread[nioEventLoopGroup-2-1,10,main] Thread[nioEventLoopGroup-2-1,10,main] class io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise
由输出可知,I/O线程完成之后,将会通知doResolveAndConnect中注册regFuture上的监听者,并负责回调operationComplete方法。这里regFuture类型为DefaultChannelPromise,也就是register中创建的对象。而doResolveAndConnect返回的是PendingRegistrationPromise的实例,由main方法中注册的监听器operationComplete输出可知,doResolveAndConnect返回的是PendingRegistrationPromise的实例。此外,main方法中监听器的operationComplete由I/O线程负责调用,这点后面会解释。
现在修改register0源码,在其中抛出异常:
private void register0(ChannelPromise promise) { try { ...... System.out.println("---------------AbstractUnsafe.register0---------"+Thread.currentThread()); throw new Exception(); //safeSetSuccess(promise); //pipeline.fireChannelRegistered(); ...... } catch (Throwable t) { ...... } }
启动客户端,查看输出:
---------------doResolveAndConnect-----------------Thread[main,5,main] ---------------initAndRegister-----------------Thread[main,5,main] ---------------SingleThreadEventLoop.register-----------------Thread[main,5,main] ---------------AbstractUnsafe.register-----------------Thread[main,5,main] ---------------AbstractUnsafe.register0-----------------Thread[nioEventLoopGroup-2-1,10,main] ---------------operationComplete-----------------class io.netty.channel.DefaultChannelPromise ---------------operationComplete-----------------Thread[nioEventLoopGroup-2-1,10,main] Thread[globalEventExecutor-1-1,5,main] class io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise
这里输出省略了异常堆栈,由输出可知,main方法中注册的监听器回调方法输出发生了变化,回调监听器方法operationComplete的线程变了,变成了GlobalEventExecutor。查看PendingRegistrationPromise源码:
static final class PendingRegistrationPromise extends DefaultChannelPromise { private volatile boolean registered; PendingRegistrationPromise(Channel channel) { super(channel); } void registered() { registered = true; } @Override protected EventExecutor executor() { if (registered) { return super.executor(); } return GlobalEventExecutor.INSTANCE; } }
由PendingRegistrationPromise源码可知,其继承自DefaultChannelPromise ,同时改写了executor()方法,executor()用于返回与当前 promise相关联的的执行器,通常是I/O线程,promise将通过该执行器通知注册在该promise上的监听者。这里改写了executor(),只有当registered为true时才会使用I/O线程,否则使用默认的GlobalEventExecutor。当构造PendingRegistrationPromise实例时,默认会初始化registered为false,只有调用promise.registered()后才会设置为True。
由于I/O线程调用register0发生了异常,在调用doResolveAndConnect中operationComplete方法时,cause != null成立,promise.registered()不会被调用,因此main方法中监听器的operationComplete由PendingRegistrationPromise的GlobalEventExecutor执行器负责调用。当I/O线程执行register0正常完成时,promise.registered()被调用,executor()返回的执行器即为当前Channel的EventLoop。
由此说明,在Channel注册未完成的情况下,创建PendingRegistrationPromise实例的作用是待I/O线程调用register0正常完成之后通知main方法中注册的监听者,同时,当I/O线程抛出异常时,使用默认的GlobalEventExecutor通知。I/O线程调用register0成功返回时,需要切换为使用I/O线程发送通知,不能使用默认的GlobalEventExecutor,Netty源码提供了关于这个bug的说明,链接如下:
https://github.com/netty/netty/issues/2586
那么为什么要用PendingRegistrationPromise的实例,而不直接使用regFuture呢?
当进入doResolveAndConnect的else代码块时,regFuture注册完监听器之后,doResolveAndConnect会立马返回,main线程等待回调;当I/O线程执行完register0后,将会回调regFuture上监听者的operationComplete方法,若doResolveAndConnect的else代码块直接返回regFuture时,I/O线程也会负责回调main方法中的operationComplete,包括解析并创建连接都将由I/O线程来完成。
那么问题来了,当I/O线程先执行main方法中的operationComplete方法时,doResolveAndConnect0中的连接操作可能还未被执行,如果connection还未建立, f.channel().closeFuture().sync()将不会阻塞直到connection关闭,而是马上返回,main线程将会退出。main线程都退出了,I/O线程线程啥的不就直接Over啦。因此使用PendingRegistrationPromise的实例向main线程发送通知,当register0失败时,promise负责传递regFuture中的异常到main线程;当register0成功时,promise需要等待doResolveAndConnect0完成后再回调main方法中监听器的operationComplete。关于这一点,doResolveAndConnect中就算进入if代码块,也会调用channel.newPromise()创建一个新的promise实例通知客户端。
看示例,修改源码,doResolveAndConnect的else代码块中不返回PendingRegistrationPromise实例,而是返回regFuture,去掉register0中抛出异常的代码:
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { System.out.println("---------------doResolveAndConnect-----------------"+Thread.currentThread()); final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); ...... return regFuture; } }
启动客户端,输出:
Disconnected from the target VM, address: '127.0.0.1:60198', transport: 'socket'
由于服务端未启动,正常情况下应该抛出异常,而现在直接main方法退出。
本文中的内容涉及Channel的初始化过程,Netty的线程模型和Future机制,有问题可随时与笔者联系。
欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/1154326
相关文章推荐
- netty源码分析之ChannelFuture
- netty源码分析之ChannelHandler
- 我们来谈谈promise,讨论一下如何优雅的避免多层回调嵌套的问题
- 支付宝支付后回调通知中responseTxt=true isSign=False可能的问题
- 支付宝支付后回调通知中responseTxt=true isSign=False可能的问题
- jersey处理支付宝异步回调通知的问题:java.lang.IllegalArgumentException: Error parsing media type 'application/x-www-form-urlencoded; text/html; charset=UTF-8'
- 【Netty源码学习】DefaultChannelPipeline(三)
- 深入理解Promise框架(解决js中的回调地域问题!)
- ClassNotFoundException, org.jboss.netty.channel.ChannelPipelineFactory 缺少jar包【远程调用dubbo分布式服务框架遇到的问题】
- netty核心源码 一 ChannelPipeline
- Netty源码解读(三)Channel与Pipeline
- Netty3 源码分析 - ChannelHandlerContext
- Netty3 源码分析 - Channel
- Netty 源码分析之SimpleChannelInboundHandler
- Netty 4.0 源码分析(三):Channel和ChannelPipeline
- netty源码分析之ChannelPipeline
- Netty源码解读 Channel与Pipeline
- Netty3 源码分析 - ChannelUpstreamHandler
- 微信扫码支付notify_url回调接收通知问题
- netty核心源码 三 DefaultChannelPipeline