Dubbo服务暴露的流程
2017-12-17 17:50
393 查看
在Dubbo服务暴露中,需要被暴露的服务的接口类首先会通过proxyFactory代理工厂获得代理的对象invoker,而暴露之后的服务被调用,也将都会通过这个invoker来调用相关的类。
在dubbo中默认采用javassistProxyFactory来获取由javassist代理的invoker。
首先会根据所要代理的类名产生相应的wrapper来对所要代理的类进行包装。
在getWrapper中,会先根据所要包装的类是否已经被包装过。如果已经包装过,则会直接在来保存已经生成的包装类的map里去寻找,否则会直接生成新的包装类。
makeWrapper()根据传递的类获取新的包装类。
在makeWrapper()方法中,通过javassist根据所需要包装的类动态生成了包装类。由于Invoker的作用,大概在这里动态实现的最重要的方法应该是invokeMothed()方法。if( hasMethod ){
c3.append(" try{");
}
for( Method m : methods )
{
if( m.getDeclaringClass() == Object.class ) //ignore Object's method.
continue;
String mn = m.getName();
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for( Method m2 : methods ) {
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
if (override) {
if (len > 0) {
for (int l = 0; l < len; l ++) {
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
c3.append(" ) { ");
if( m.getReturnType() == Void.TYPE )
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
else
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
c3.append(" }");
mns.add(mn);
if( m.getDeclaringClass() == c )
dmns.add(mn);
ms.put(ReflectUtils.getDesc(m), m);
}
if( hasMethod ){
c3.append(" } catch(Throwable e) { " );
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); " );
c3.append(" }");
}
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
在这段动态生成了invokeMethod的方法体。将会根据方法名调用包装类中真正所需要被调用的类的方法。
大致的invokeMethod()方法如下。
可以见到,当外部代用被包装的类的相应的方法,都会在这里被找到方法并调用。
在得到了包装类之后,在javassistProxyFactory的getInvoker()方法最后会生成新的AbstractProxyInvoker(),重写了doInvoke()方法,会直接调用刚刚生成的包装类的invokeMethod()方法。
在得到invoker之后,会根据获得的invoker开始服务的暴露,通过protocol的export()方法。配置有filter或者listener的情况下,会在这里产生关于具体服务暴露操作的过滤与监听。值得一提的是,如果采用的是registry协议,那么并不会经过ProtocolListenerWrapper的监听,而是直接进入export()方法开始服务的暴露。
以上是registryProtocol的暴露服务export()方法的流程。
首先会通过doLocalExport()方法开始本地服务的暴露。
在本地暴露的过程中,首先会根据传入的invoker获取其提供方的url来获得key,根据获得的key来尝试获得ExporterChangeableWrapper,如果没有,则会先通过原本采用的protocol协议进行服务暴露,默认dubbo。
在dubboProtocol的export()方法中,会根据url生成server key,根据invoker和key生成新的dubboExporter并根据key和exporter存放在map里。但是最关键的是,将在这里调用openServer()方法开启与注册中心的连接。
在openServer中,如果已经建立过与服务器的连接,那么会直接在这里返回,否则会通过netty新建与注册中心的网络连接。
在这里会通过生成ExchangeServer开始监听相应的channel并绑定对应的requestHandler进行相关的回调处理。
可见,在dubboProtocol的服务暴露过程中,完成了对网络监听的配置与开启。
在完成dubboProtocol的export()方法之后,回到RegistryProtocol的doLocalExport()方法,根据dubboProtocol暴露生成的dubboExporter()以及invoker生成新的ExportChangeableWrapper返回。
之后,通过getRegistry获得注册中心的实例。会根据配置的registryFactory生成对应的registry实例。
以zookeeper为例,在其构造方法当中会根据url新生成一个zkClient得到与zookeeper注册中心的连接。
在得到了注册中心的实例之后,通过register()方法正式注册服务到注册中心当中去。
在调用注册方法的过程中,被注册的url将会在zookeeperRegistry的父类的父类abstractRegistry中保存被注册的url。然后在zookeeperRegistry的父类当中调用zookeeperRegistry的doRegistry()方法。也就说在zookeeperRegistry的doRegistry方法当中,会将要暴露的服务信息提交给注册中心。
之后,会向注册中心订阅这一服务,以保证注册数据变动时的自动推送。
在dubbo中默认采用javassistProxyFactory来获取由javassist代理的invoker。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
首先会根据所要代理的类名产生相应的wrapper来对所要代理的类进行包装。
在getWrapper中,会先根据所要包装的类是否已经被包装过。如果已经包装过,则会直接在来保存已经生成的包装类的map里去寻找,否则会直接生成新的包装类。
makeWrapper()根据传递的类获取新的包装类。
在makeWrapper()方法中,通过javassist根据所需要包装的类动态生成了包装类。由于Invoker的作用,大概在这里动态实现的最重要的方法应该是invokeMothed()方法。if( hasMethod ){
c3.append(" try{");
}
for( Method m : methods )
{
if( m.getDeclaringClass() == Object.class ) //ignore Object's method.
continue;
String mn = m.getName();
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for( Method m2 : methods ) {
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
if (override) {
if (len > 0) {
for (int l = 0; l < len; l ++) {
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
c3.append(" ) { ");
if( m.getReturnType() == Void.TYPE )
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
else
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
c3.append(" }");
mns.add(mn);
if( m.getDeclaringClass() == c )
dmns.add(mn);
ms.put(ReflectUtils.getDesc(m), m);
}
if( hasMethod ){
c3.append(" } catch(Throwable e) { " );
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); " );
c3.append(" }");
}
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
在这段动态生成了invokeMethod的方法体。将会根据方法名调用包装类中真正所需要被调用的类的方法。
大致的invokeMethod()方法如下。
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { dubbo.provider.hello.service.impl.HelloWorld w; try { w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("helloWorld".equals($2) && $3.length == 0) { w. helloWorld (); return null; } if ("getName".equals($2) && $3.length == 0) { return ($w)w. getNanme (); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl."); }
可以见到,当外部代用被包装的类的相应的方法,都会在这里被找到方法并调用。
在得到了包装类之后,在javassistProxyFactory的getInvoker()方法最后会生成新的AbstractProxyInvoker(),重写了doInvoke()方法,会直接调用刚刚生成的包装类的invokeMethod()方法。
在得到invoker之后,会根据获得的invoker开始服务的暴露,通过protocol的export()方法。配置有filter或者listener的情况下,会在这里产生关于具体服务暴露操作的过滤与监听。值得一提的是,如果采用的是registry协议,那么并不会经过ProtocolListenerWrapper的监听,而是直接进入export()方法开始服务的暴露。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { a550 exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
以上是registryProtocol的暴露服务export()方法的流程。
首先会通过doLocalExport()方法开始本地服务的暴露。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter; }
在本地暴露的过程中,首先会根据传入的invoker获取其提供方的url来获得key,根据获得的key来尝试获得ExporterChangeableWrapper,如果没有,则会先通过原本采用的protocol协议进行服务暴露,默认dubbo。
在dubboProtocol的export()方法中,会根据url生成server key,根据invoker和key生成新的dubboExporter并根据key和exporter存放在map里。但是最关键的是,将在这里调用openServer()方法开启与注册中心的连接。
在openServer中,如果已经建立过与服务器的连接,那么会直接在这里返回,否则会通过netty新建与注册中心的网络连接。
private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
在这里会通过生成ExchangeServer开始监听相应的channel并绑定对应的requestHandler进行相关的回调处理。
可见,在dubboProtocol的服务暴露过程中,完成了对网络监听的配置与开启。
在完成dubboProtocol的export()方法之后,回到RegistryProtocol的doLocalExport()方法,根据dubboProtocol暴露生成的dubboExporter()以及invoker生成新的ExportChangeableWrapper返回。
之后,通过getRegistry获得注册中心的实例。会根据配置的registryFactory生成对应的registry实例。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
以zookeeper为例,在其构造方法当中会根据url新生成一个zkClient得到与zookeeper注册中心的连接。
在得到了注册中心的实例之后,通过register()方法正式注册服务到注册中心当中去。
在调用注册方法的过程中,被注册的url将会在zookeeperRegistry的父类的父类abstractRegistry中保存被注册的url。然后在zookeeperRegistry的父类当中调用zookeeperRegistry的doRegistry()方法。也就说在zookeeperRegistry的doRegistry方法当中,会将要暴露的服务信息提交给注册中心。
之后,会向注册中心订阅这一服务,以保证注册数据变动时的自动推送。
相关文章推荐
- Dubbo 服务暴露注册流程
- dubbo服务原始暴露流程
- Dubbo服务再暴露
- Dubbo源码解析 —— 服务暴露总结
- dubbo源码学习(五)dubbo暴露服务的过程
- Dubbo源码解析 —— 服务暴露总结
- Dubbo 服务暴露与引用
- SSM(十三) 将dubbo暴露出HTTP服务
- Dubbo 服务暴露细节
- Dubbo源码解析 —— 服务暴露总结
- Dubbo源码分析之三:服务的暴露
- dubbo源码解析(六) dubbo服务发布过程及本地暴露
- Dubbo框架初探【用Spring配置声明暴露服务(可以使用multicast广播注册中心暴露服务地址或者使用zookeeper注册中心暴露服务地址)、加载Spring配置,启动服务】
- dubbo 暴露服务 和引用服务
- Dubbo/Dubbox的服务暴露(二)-扩展点机制
- Dubbo源码解析 —— 服务暴露总结
- dubbo服务暴露过程源码分析
- Dubbo——各协议暴露和引用服务的逻辑
- dubbo源码分析10——服务暴露1_export()方法分析
- Dubbo使用multicast广播注册中心暴露服务地址时启动报错empty notify