您的位置:首页 > 其它

dubbo服务端处理请求源码分析

2018-07-17 20:28 190 查看

在duubo服务成功发布之后是如何处理服务调用请求的呢,这块的代码结构简单来说就是基于handler的调用链,每一个handler持有下一个handler的引用,执行完当前handler则调用下一个handler的handle方法。

基于调用链模式,提供对于服务请求的多种处理。回到服务发布的代码,分析handler是何时封装的。在另外一篇dubbo服务发布过程源码分析文章分析服务发布过程中,已经知道在DubboProtocol. createServer方法进行首次的handler传递,用于处理服务请求,在createServer方法有如下代码。

server = Exchangers.bind(url, requestHandler);

进一步分析这个requestHandler的定义,是在DubboProtocol中使用匿名方式创建的实例,如下所示。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}

@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}

@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isInfoEnabled()) {
logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}

private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}

private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
}

Requesthandler作为参数继续传递进行服务发布,在后续还会继续包装别的handler,因此request是整个handler调用链最后一个执行处理器,暂时先不看这里面的方法的作用。继续跟踪server = Exchangers.bind(url, requestHandler),按照上一篇dubbo服务发布过程文章的分析,可以知道会进入到HeaderExchanger.bind()方法,代码如下。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

先执行new HeaderExchangeHandler(handler),对requestHandler进行一层包装,我们看一下HeaderExchangeHandler对requestHandler的包装方式,就是持有它的引用,代码如下。

public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}

然后再执行new DecodeHandler(new HeaderExchangeHandler(handler))),对HeaderExchangeHandler再进行一次包装,类似地,也是持有HeaderExchangeHandler的引用。最终传递一个DecodeHandler执行Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))方法,继续跟踪代码。

按照dubbo服务发布过程分析可知,会进入NettyTransporter执行如下代码。

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}

进一步进入NettyServer的构造方法,代码如下。

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

会对DecodeHandler进行再一次的包装,进入ChannelHandlers的包装方法,最终执行代码如下。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

首先获取Dispatcher的适配器,然后调用dispatch()方法,实现Dispatcher接口的扩展类默认只有ExecutionDispatcher,其执行代码如下。

public class ExecutionDispatcher implements Dispatcher {

public static final String NAME = "execution";

public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
}

}

可以看见,又对DecodeHandler进行包装返回一个ExecutionChannelHandler对象。接着对ExecutionChannelHandler包装,返回HeartbeatHandler,然后又一次对HeartbeatHandler包装,返回MultiMessageHandler。

我们回到NettyServer的构造方法,里面调用父类AbstractServer构造,AbstractServer构造又调用父类AbstractEndpoint
构造,AbstractEndpoint构造接着又调用父类AbstractPeer构造,最终设置handler属性只指向MultiMessageHandler,也就是说NettyServer持有MultiMessageHandler引用,NettyServer也继承了ChannelHandler接口。

最后在NettyServer的doOpen方法开启服务时,传入handler作为接受请求处理器,代码如下。

@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}

可以看出,在 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);中进一步对NettyServer进行封装,返回一个NettyHandler作为处理器,加入到Netty的处理调用链中。

最终,当有服务请求时,调用链如下。

每个handler都有对应的职责,我们关注requestHandler里面的处理逻辑,最终调用的是reply
方法。

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//根据channel的参数,从发布的服务中选择出目标服务的代理对象Invoker
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
//获取服务的方法列表字符串
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
//校验有没有这个方法
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//调用服务实例,问题invoker是什么实例?
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

上面代码的getInvoker(channel, inv)方法里面,获取Invoker的逻辑如下。DubboExporter持有invoker对象

String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

显然是从我们服务发布时,记录的exporterMap里面获取到代理对象的,因此可以据此分析我们的Invoker到底是什么实例。
跟踪一遍发布过程,发现在DubboProtocol.export方法里面有如下代码

DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

据此可以知道这个Invoker实例是InvokerDelegete,InvokerDelegete继承自InvokerWrapper,其本身没有覆写父类的invoke方法,因此调用的是InvokerWrapper的方法。

public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}

后续获得的invoker还会有多重包装,这里Invoke调用也是基于调用链,这里不再具体分析,直接画出调用链如下。

最终通过Wrapper代理对象调用service对象的方法。

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: