dubbo服务端处理请求源码分析
在duubo服务成功发布之后是如何处理服务调用请求的呢,这块的代码结构简单来说就是基于handler的调用链,每一个handler持有下一个handler的引用,执行完当前handler则调用下一个handler的handle方法。
基于调用链模式,提供对于服务请求的多种处理。回到服务发布的代码,分析handler是何时封装的。在另外一篇dubbo服务发布过程源码分析文章分析服务发布过程中,已经知道在DubboProtocol. createServer方法进行首次的handler传递,用于处理服务请求,在createServer方法有如下代码。
server = Exchangers.bind(url, requestHandler);
进一步分析这个requestHandler的定义,是在DubboProtocol中使用匿名方式创建的实例,如下所示。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } }
Requesthandler作为参数继续传递进行服务发布,在后续还会继续包装别的handler,因此request是整个handler调用链最后一个执行处理器,暂时先不看这里面的方法的作用。继续跟踪server = Exchangers.bind(url, requestHandler),按照上一篇dubbo服务发布过程文章的分析,可以知道会进入到HeaderExchanger.bind()方法,代码如下。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
先执行new HeaderExchangeHandler(handler),对requestHandler进行一层包装,我们看一下HeaderExchangeHandler对requestHandler的包装方式,就是持有它的引用,代码如下。
public HeaderExchangeHandler(ExchangeHandler handler) { if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.handler = handler; }
然后再执行new DecodeHandler(new HeaderExchangeHandler(handler))),对HeaderExchangeHandler再进行一次包装,类似地,也是持有HeaderExchangeHandler的引用。最终传递一个DecodeHandler执行Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))方法,继续跟踪代码。
按照dubbo服务发布过程分析可知,会进入NettyTransporter执行如下代码。
public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
进一步进入NettyServer的构造方法,代码如下。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
会对DecodeHandler进行再一次的包装,进入ChannelHandlers的包装方法,最终执行代码如下。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
首先获取Dispatcher的适配器,然后调用dispatch()方法,实现Dispatcher接口的扩展类默认只有ExecutionDispatcher,其执行代码如下。
public class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new ExecutionChannelHandler(handler, url); } }
可以看见,又对DecodeHandler进行包装返回一个ExecutionChannelHandler对象。接着对ExecutionChannelHandler包装,返回HeartbeatHandler,然后又一次对HeartbeatHandler包装,返回MultiMessageHandler。
我们回到NettyServer的构造方法,里面调用父类AbstractServer构造,AbstractServer构造又调用父类AbstractEndpoint
构造,AbstractEndpoint构造接着又调用父类AbstractPeer构造,最终设置handler属性只指向MultiMessageHandler,也就是说NettyServer持有MultiMessageHandler引用,NettyServer也继承了ChannelHandler接口。
最后在NettyServer的doOpen方法开启服务时,传入handler作为接受请求处理器,代码如下。
@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
可以看出,在 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);中进一步对NettyServer进行封装,返回一个NettyHandler作为处理器,加入到Netty的处理调用链中。
最终,当有服务请求时,调用链如下。
每个handler都有对应的职责,我们关注requestHandler里面的处理逻辑,最终调用的是reply
方法。
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //根据channel的参数,从发布的服务中选择出目标服务的代理对象Invoker Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { //获取服务的方法列表字符串 String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; //校验有没有这个方法 if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //调用服务实例,问题invoker是什么实例? return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
上面代码的getInvoker(channel, inv)方法里面,获取Invoker的逻辑如下。DubboExporter持有invoker对象
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
显然是从我们服务发布时,记录的exporterMap里面获取到代理对象的,因此可以据此分析我们的Invoker到底是什么实例。
跟踪一遍发布过程,发现在DubboProtocol.export方法里面有如下代码
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter);
据此可以知道这个Invoker实例是InvokerDelegete,InvokerDelegete继承自InvokerWrapper,其本身没有覆写父类的invoke方法,因此调用的是InvokerWrapper的方法。
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
后续获得的invoker还会有多重包装,这里Invoke调用也是基于调用链,这里不再具体分析,直接画出调用链如下。
最终通过Wrapper代理对象调用service对象的方法。
阅读更多- zookeeper源码分析之五服务端(集群leader)处理请求流程
- zookeeper源码分析之四服务端(单机)处理请求流程
- Dubbo源码分析(三):Dubbo之服务端(Service)
- Fresco 源码分析(三) Fresco服务端处理(3) DataSource到Producer的适配器逻辑以及BitmapMemoryCacheProducer处理的逻辑
- SpringMVC源码分析(3)DispatcherServlet的请求处理流程
- Spring MVC请求处理流程及源码分析
- Spring MVC请求处理流程及源码分析
- ASP_NET_MVC3_请求处理流程(2) MVC源码分析
- SpringMVC源码分析 DispatcherServlet请求处理过程
- 【Zookeeper】源码分析之请求处理链(二)
- DispatcherServlet 源码分析(六) - HandlerAdapter 处理请求
- memcached 源码分析之请求处理(状态机)
- Struts2源码分析--请求处理
- dubbo请求处理线程模型实现分析
- Spring MVC源码分析(续)——请求处理
- Struts2请求处理流程及源码分析
- Mybatis工作机制源码分析—一次insert请求处理流程
- struts1源码分析(三)请求处理主线
- Spring源码分析: SpringMVC启动流程与DispatcherServlet请求处理流程
- Dubbo源码阅读之 服务端和客户端处理链