dubbo 多版本部分源码分析
2017-08-04 16:44
387 查看
提供端分析
服务提供者在起动时,会执行到DubboProtocol.export,生成DubboExporter对象,并放入exportMap中。public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } }
在调用的时候,可以通过exportMap获取到dubboExporter,再从dubboExporter中取到invoker,从而调用真正的服务,而exportMap中对应的key值
/** serviceGroup/serviceName:serviceVersion:port*/ public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { StringBuilder buf = new StringBuilder(); if (serviceGroup != null && serviceGroup.length() > 0) { buf.append(serviceGroup); buf.append("/"); } buf.append(serviceName); if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) { buf.append(":"); buf.append(serviceVersion); } buf.append(":"); buf.append(port); return buf.toString(); }
当调用到达服务端时,会被DubboProtocol的requestHandler处理,其中的requestHandler.reply只处理远程消息调用(Invocation),
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) {//只处理远程调用消息Invocation Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
其后利用getInvoker方法,根据servicekey取到invoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{ boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); //如果是客户端的回调服务. isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if (isStubServiceInvoke){ port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if(isCallBackServiceInvoke){ path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); /** 从exporterMap中获取暴露服务的DubboExporter*/ DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); return exporter.getInvoker();//从dubboExporter获取invoker返回 }
调用端分析
消费者在初始化时,会调用到RegistryProtocol.doReferprivate <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); //生成消费者URL // String ipAddr = NetUtils.getLocalHost(); // if(!ConfUtil.getParamInEnv("HOST").isEmpty()){ // ipAddr = ConfUtil.getParamInEnv("HOST"); // } URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); // URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, ipAddr, 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } /** 订阅此url*/ directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));//category (providers,configurators,routers) return cluster.join(directory); }
每个消费者创建一个RegistryDirectory(用于缓存Invoker等),所以分析directory.subscribe
@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服务器端发送订阅请求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.dir") + "/.hsf/hsf-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 如果开启了启动时检测,则直接抛出异常,约定 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 将失败的订阅请求记录到失败列表,定时重试 addFailedSubscribed(url, listener); } }
然后转到ZookeeperRegistry.doSubscribe方法
else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { List<URL> changeList = toUrlsWithEmpty(url, parentPath, currentChilds); // if(currentChilds.size()>0) updateData(changeList); //cuihs ZookeeperRegistry.this.notify(url, listener, changeList); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); }
注意toUrlsWithEmpty这个方法
/** 如果提供者与消费者不匹配,则生成一个empty协议的标识性URL*/ private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) { List<URL> urls = toUrlsWithoutEmpty(consumer, providers); if (urls == null || urls.isEmpty()) {//通知的数据类型为空时,需要通知一个empty协议并带categroy参数的标识性URL int i = path.lastIndexOf('/'); String category = i < 0 ? path : path.substring(i + 1); URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category); urls.add(empty); } return urls; }
找到toUrlsWithoutEmpty这个方法,里面调用了UrlsUtils.isMath
/** 获取与消费者匹配的提供者URL列表*/ private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { List<URL> urls = new ArrayList<URL>(); if (providers != null && providers.size() > 0) { for (String provider : providers) { provider = URL.decode(provider); if (provider.contains("://")) { URL url = URL.valueOf(provider); if (UrlUtils.isMatch(consumer, url)) { urls.add(url); } } } } return urls; }
/** 检查两个URL的接口名、group version classifier匹配,二者是否可用*/ public static boolean isMatch(URL consumerUrl, URL providerUrl) { String consumerInterface = consumerUrl.getServiceInterface(); String providerInterface = providerUrl.getServiceInterface(); //检查接口名是否匹配 if( ! (Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface)) ) return false; //检查分类是否匹配 if (! isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY), consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) { return false; } //检查消费者和提供者是否都可用 if (! providerUrl.getParameter(Constants.ENABLED_KEY, true) && ! Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) { return false; } //检查group、version、classifier是否匹配 String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY); String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY); String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE); String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY); String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY); String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE); return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup)) && (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion)) && (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier)); }
到这里为止就已经实现了版本的路由功能,
相关文章推荐
- dubbo注册部分源码分析与我思考的miniDubbo结构
- Django rest framework源码分析(4)----版本
- Android中Volley的使用及部分源码分析
- duBand源码分析-存储部分
- dubbo源码分析-consumer端4-ClusterInvoker与LoadBalance
- Dubbo介绍2- 源码分析,通过schema启动服务
- Dubbo源码分析2
- dubbo源码分析三:consumer注册及生成代理对象
- linphone源码分析----初始化部分
- duBand源码分析-数据自动同步部分
- Dubbo记录-服务消费者端的源码分析
- [置顶] 40-总结-【cartographer源码分析】系列的第四部分【io源码分析】
- Phalcon框架启动流程(部分源码)分析
- Tomcat -- 源码分析NO1(模拟简易版本servlet容器)
- Jdk1.8版本Semaphore实现源码分析
- dubbo源码分析-consumer端6-数据发送与接收
- Dubbo源码分析----扩展机制
- ZooKeeper源码分析-Jute-第二部分
- 从Phone源码中分析来电话流程(部分)
- dubbo源码分析系列——dubbo-rpc-default模块源码分析