您的位置:首页 > 其它

Netty源码之ChannelPromise回调通知问题

2017-07-04 00:00 531 查看
摘要: 本文主要结合netty源码分析一个关于ChannelPromise的回调问题,其中内容涉及Netty线程模型和future机制。文中源码基于Netty4.1

先抛出写此文章的问题描述:

客户端调用Connect连接服务端时,在执行doResolveAndConnect期间,当regFuture的isDone返回false时,Netty会在regFuture中注册监听器用于Channel初始化和注册成功之后回调,并且新创建了一个PendingRegistrationPromise的实例,那么为什么需要创建一个PendingRegistrationPromise的实例呢?作用是什么?下文将重点解答此问题。

回顾下connect过程的方法调用链:

connect->doResolveAndConnect->initAndRegister->SingleThreadEventLoop.register->AbstractUnsafe.register->AbstractUnsafe.register0->doResolveAndConnect0 ......

首先,对启动客户端的代码进行改造:

ChannelFuture f = b.connect(HOST, PORT);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(Thread.currentThread());
System.out.println(future.getClass());
}
});
f.sync();

意图很简单,connect完成后回调,打印出执行operationComplete的线程和future的类型。

同时,在方法doResolveAndConnect、initAndRegister、SingleThreadEventLoop.register、AbstractUnsafe.register和AbstractUnsafe.register0中加入代码,打印当前线程:

System.out.println(Thread.currentThread());

正常启动客户端,查看输出:

---------------doResolveAndConnect-----------------Thread[main,5,main]
---------------initAndRegister-----------------Thread[main,5,main]
---------------SingleThreadEventLoop.register-----------------Thread[main,5,main]
---------------AbstractUnsafe.register-----------------Thread[main,5,main]
---------------AbstractUnsafe.register0-----------------Thread[nioEventLoopGroup-2-1,10,main]
Thread[nioEventLoopGroup-2-1,10,main]
class io.netty.channel.DefaultChannelPromise

输出结果很简单,connect过程中,将会在main线程中执行doResolveAndConnect、initAndRegister、SingleThreadEventLoop.register、AbstractUnsafe.register,直到调用AbstractUnsafe.register0方法注册当前Channel到I/O线程的多路复用器时,register0将会进入I/O线程执行,这点由AbstractUnsafe.register源码可知道:

......
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
......
}
}
......

这段代码的含义是,如果当前线程是当前Channel的I/O线程,直接调用register0;否则,封装register0在Runnable中提交到任务队列交由当前Channel的I/O线程执行。同时,由main方法中加入的监听器回调方法输出可知,客户端ChannelFuture中注册的监听者由I/O线程调用operationComplete进行通知,其中future对象的类型为DefaultChannelPromise,这点由SingleThreadEventLoop.register源码可知:

public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

register方法使用当前Channel对象和当前NioEventLoop对象来构造DefaultChannelPromise的实例,因此DefaultChannelPromise实例将使用该NioEventLoop来作为通知监听者的executor。

那么doResolveAndConnect何时会进入else代码块呢?

由于main线程在AbstractUnsafe.register中会将register0交给I/O线程执行,main线程将会立即返回,进入doResolveAndConnect中检测从initAndRegister返回的regFuture对象,即DefaultChannelPromise的实例,若regFuture的isDone返回false,表明I/O线程还在执行register0的过程中,任务未完成;那么main线程将会进入else代码块,创建PendingRegistrationPromise实例,同时在regFuture上注册监听器,待I/O线程完成任务后通知,并将PendingRegistrationPromise实例返回客户端。

下面分析创建并返回PendingRegistrationPromise实例的作用。

首先修改监听器的回调方法operationComplete,打印出当前线程:

regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
System.out.println("---------------operationComplete-----------------"+future.getClass());
System.out.println("---------------operationComplete-----------------"+Thread.currentThread());
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});

同时,修改源码,让I/O线程在执行register0过程中睡眠1s:

private void register0(ChannelPromise promise) {
try {
......
System.out.println("---------------AbstractUnsafe.register0---------"+Thread.currentThread());
Thread.sleep(1000);
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
......
} catch (Throwable t) {
......
}
}

重新启动客户端,查看输出:

---------------doResolveAndConnect-----------------Thread[main,5,main]
---------------initAndRegister-----------------Thread[main,5,main]
---------------SingleThreadEventLoop.register-----------------Thread[main,5,main]
---------------AbstractUnsafe.register-----------------Thread[main,5,main]
---------------AbstractUnsafe.register0-----------------Thread[nioEventLoopGroup-2-1,10,main]
---------------operationComplete-----------------class io.netty.channel.DefaultChannelPromise
---------------operationComplete-----------------Thread[nioEventLoopGroup-2-1,10,main]
Thread[nioEventLoopGroup-2-1,10,main]
class io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise

由输出可知,I/O线程完成之后,将会通知doResolveAndConnect中注册regFuture上的监听者,并负责回调operationComplete方法。这里regFuture类型为DefaultChannelPromise,也就是register中创建的对象。而doResolveAndConnect返回的是PendingRegistrationPromise的实例,由main方法中注册的监听器operationComplete输出可知,doResolveAndConnect返回的是PendingRegistrationPromise的实例。此外,main方法中监听器的operationComplete由I/O线程负责调用,这点后面会解释。

现在修改register0源码,在其中抛出异常:

private void register0(ChannelPromise promise) {
try {
......
System.out.println("---------------AbstractUnsafe.register0---------"+Thread.currentThread());
throw new Exception();
//safeSetSuccess(promise);
//pipeline.fireChannelRegistered();
......
} catch (Throwable t) {
......
}
}

启动客户端,查看输出:

---------------doResolveAndConnect-----------------Thread[main,5,main]
---------------initAndRegister-----------------Thread[main,5,main]
---------------SingleThreadEventLoop.register-----------------Thread[main,5,main]
---------------AbstractUnsafe.register-----------------Thread[main,5,main]
---------------AbstractUnsafe.register0-----------------Thread[nioEventLoopGroup-2-1,10,main]
---------------operationComplete-----------------class io.netty.channel.DefaultChannelPromise
---------------operationComplete-----------------Thread[nioEventLoopGroup-2-1,10,main]
Thread[globalEventExecutor-1-1,5,main]
class io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise

这里输出省略了异常堆栈,由输出可知,main方法中注册的监听器回调方法输出发生了变化,回调监听器方法operationComplete的线程变了,变成了GlobalEventExecutor。查看PendingRegistrationPromise源码:

static final class PendingRegistrationPromise extends DefaultChannelPromise {
private volatile boolean registered;
PendingRegistrationPromise(Channel channel) {
super(channel);
}
void registered() {
registered = true;
}
@Override
protected EventExecutor executor() {
if (registered) {
return super.executor();
}
return GlobalEventExecutor.INSTANCE;
}
}

由PendingRegistrationPromise源码可知,其继承自DefaultChannelPromise ,同时改写了executor()方法,executor()用于返回与当前 promise相关联的的执行器,通常是I/O线程,promise将通过该执行器通知注册在该promise上的监听者。这里改写了executor(),只有当registered为true时才会使用I/O线程,否则使用默认的GlobalEventExecutor。当构造PendingRegistrationPromise实例时,默认会初始化registered为false,只有调用promise.registered()后才会设置为True。

由于I/O线程调用register0发生了异常,在调用doResolveAndConnect中operationComplete方法时,cause != null成立,promise.registered()不会被调用,因此main方法中监听器的operationComplete由PendingRegistrationPromise的GlobalEventExecutor执行器负责调用。当I/O线程执行register0正常完成时,promise.registered()被调用,executor()返回的执行器即为当前Channel的EventLoop。

由此说明,在Channel注册未完成的情况下,创建PendingRegistrationPromise实例的作用是待I/O线程调用register0正常完成之后通知main方法中注册的监听者,同时,当I/O线程抛出异常时,使用默认的GlobalEventExecutor通知。I/O线程调用register0成功返回时,需要切换为使用I/O线程发送通知,不能使用默认的GlobalEventExecutor,Netty源码提供了关于这个bug的说明,链接如下:

https://github.com/netty/netty/issues/2586

那么为什么要用PendingRegistrationPromise的实例,而不直接使用regFuture呢?

当进入doResolveAndConnect的else代码块时,regFuture注册完监听器之后,doResolveAndConnect会立马返回,main线程等待回调;当I/O线程执行完register0后,将会回调regFuture上监听者的operationComplete方法,若doResolveAndConnect的else代码块直接返回regFuture时,I/O线程也会负责回调main方法中的operationComplete,包括解析并创建连接都将由I/O线程来完成。

那么问题来了,当I/O线程先执行main方法中的operationComplete方法时,doResolveAndConnect0中的连接操作可能还未被执行,如果connection还未建立, f.channel().closeFuture().sync()将不会阻塞直到connection关闭,而是马上返回,main线程将会退出。main线程都退出了,I/O线程线程啥的不就直接Over啦。因此使用PendingRegistrationPromise的实例向main线程发送通知,当register0失败时,promise负责传递regFuture中的异常到main线程;当register0成功时,promise需要等待doResolveAndConnect0完成后再回调main方法中监听器的operationComplete。关于这一点,doResolveAndConnect中就算进入if代码块,也会调用channel.newPromise()创建一个新的promise实例通知客户端。

看示例,修改源码,doResolveAndConnect的else代码块中不返回PendingRegistrationPromise实例,而是返回regFuture,去掉register0中抛出异常的代码:

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
System.out.println("---------------doResolveAndConnect-----------------"+Thread.currentThread());
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
......
return regFuture;
}
}

启动客户端,输出:

Disconnected from the target VM, address: '127.0.0.1:60198', transport: 'socket'

由于服务端未启动,正常情况下应该抛出异常,而现在直接main方法退出。

本文中的内容涉及Channel的初始化过程,Netty的线程模型和Future机制,有问题可随时与笔者联系。

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/1154326
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息