dubbo服务引用
2017-12-28 08:26
281 查看
1.dubbo的xml配置文件dubbo:reference 对应的就是ReferenceBean,它实现了FactoryBean,所以在初始化 实例时就会调用他的实现方法,从而开启服务引用
检查并且保存各种配置到Map<String, String> map,然后根据这些生成具体的代理 ref = createProxy(map)
2.加载注册URL后开始暴露具体的refer, invoker = refprotocol.refer(interfaceClass, urls.get(0));
3.创建注册目录保存注册的信息,在zk中创建某个接口的短暂的consumer节点,重置当前注册目录的consumerUrl,在注册中心订阅并创建针对该接口的三个节点providers,configurators,routers并且设置监听器ChildListener监听他们以及子节点的变化,最后通过调用NotifyListener来触发相应的事件,因为注册目录也实现了该接口,所以最开始传进来的NotifyListener就是最开始创建的注册目录类RegistryDirectory
4.把URL信息转化成对应的键值对缓存,为了以后直接调用代理执行接口方法打下基础,将URL列表转成Invoker列表,换方法名映射Invoker列表
/**
* 根据invokerURL列表转换为invoker列表。转换规则如下:
* 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
* 2.如果传入的invoker列表不为空,则表示最新的invoker列表
* 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
* @param invokerUrls 传入的参数不能为null
*/
private void refreshInvoker(List<URL> invokerUrls){
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
}
if (invokerUrls.size() ==0 ){
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
return ;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try{
destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
}catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}这里会根据dubbo协议进行refer生成invoker返回保存起来invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);过滤器和监听器包装
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType
4000
, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}5.准备创建客户端连接,默认是共享连接 ,第一次没有的话会初始化ExchangeClient exchagneclient = initClient(url);获取网络通讯段netty,设置版本心跳编解码器,
最后ExchangeClient client = Exchangers.connect(url ,requestHandler);这块的逻辑和服务暴露是类似的,只不过服务引用是connect,而服务暴露是bind,请求处理器进行
包装,增加编解码,请求头处理
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}通过NettyTransporter实例化NettyClient,然后就是和NettyServer类似的过程,doOpen();打开ClientBootstrap,连接暴露服务的服务端 doConnect();HeaderExchangeChannel,加心跳功能HeaderExchangeClient,最后用ReferenceCountExchangeClient包裹后缓存起来,连接获取完成之后,和接口类型,URL等,对应起来,保存成DubboInvoker返回并添加到invokers里保存.
6.组建集群功能,用MockClusterWrapper包裹 FailoverCluster,UI后返回一个FailoverClusterInvoker,在方法调用是这块会实现负载均衡以及重试,创建服务代理proxyFactory.getProxy(invoker);一样的用JavassistProxyFactory生成具体的代理类
public Object getObject() throws Exception { return get(); }
检查并且保存各种配置到Map<String, String> map,然后根据这些生成具体的代理 ref = createProxy(map)
2.加载注册URL后开始暴露具体的refer, invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 通过注册中心配置拼装URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } }同样经过ProtocolFilterWrapper和ProtocolListenerWrapper包装后,开始获取注册中心,这块和服务暴露一样,
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); return doRefer(cluster, registry, type, url); }
3.创建注册目录保存注册的信息,在zk中创建某个接口的短暂的consumer节点,重置当前注册目录的consumerUrl,在注册中心订阅并创建针对该接口的三个节点providers,configurators,routers并且设置监听器ChildListener监听他们以及子节点的变化,最后通过调用NotifyListener来触发相应的事件,因为注册目录也实现了该接口,所以最开始传进来的NotifyListener就是最开始创建的注册目录类RegistryDirectory
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory); }
public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && configuratorUrls.size() >0 ){ this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && routerUrls.size() >0 ){ List<Router> routers = toRouters(routerUrls); if(routers != null){ // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // 合并override参数 this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); }
4.把URL信息转化成对应的键值对缓存,为了以后直接调用代理执行接口方法打下基础,将URL列表转成Invoker列表,换方法名映射Invoker列表
/**
* 根据invokerURL列表转换为invoker列表。转换规则如下:
* 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
* 2.如果传入的invoker列表不为空,则表示最新的invoker列表
* 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
* @param invokerUrls 传入的参数不能为null
*/
private void refreshInvoker(List<URL> invokerUrls){
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
}
if (invokerUrls.size() ==0 ){
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
return ;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try{
destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
}catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}这里会根据dubbo协议进行refer生成invoker返回保存起来invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);过滤器和监听器包装
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType
4000
, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}5.准备创建客户端连接,默认是共享连接 ,第一次没有的话会初始化ExchangeClient exchagneclient = initClient(url);获取网络通讯段netty,设置版本心跳编解码器,
最后ExchangeClient client = Exchangers.connect(url ,requestHandler);这块的逻辑和服务暴露是类似的,只不过服务引用是connect,而服务暴露是bind,请求处理器进行
包装,增加编解码,请求头处理
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}通过NettyTransporter实例化NettyClient,然后就是和NettyServer类似的过程,doOpen();打开ClientBootstrap,连接暴露服务的服务端 doConnect();HeaderExchangeChannel,加心跳功能HeaderExchangeClient,最后用ReferenceCountExchangeClient包裹后缓存起来,连接获取完成之后,和接口类型,URL等,对应起来,保存成DubboInvoker返回并添加到invokers里保存.
6.组建集群功能,用MockClusterWrapper包裹 FailoverCluster,UI后返回一个FailoverClusterInvoker,在方法调用是这块会实现负载均衡以及重试,创建服务代理proxyFactory.getProxy(invoker);一样的用JavassistProxyFactory生成具体的代理类
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
相关文章推荐
- dubbo 源码学习笔记 (三) —— dubbo引用服务的过程
- 商城项目-dubbo,框架整合,dubbo发布和引用服务
- 学习淘淘商城第十三课(引用dubbo服务)
- dubbo源码分析(一) 服务暴露--服务引用
- 9. Dubbo原理解析-服务引用
- 理解 Dubbo 服务引用
- 商城项目实战11:引用dubbo服务
- dubbo_rpc显露服务和引用服务简析
- Dubbo暴露服务和引用服务的实现源码分析
- dubbo_rpc显露服务和引用服务简析
- dubbo refrence bean(服务引用)
- Dubbo 源码分析 - 服务引用
- dubbo源码解析(十) dubbo服务引用原理
- 关于dubbo创建服务和引用服务时,会报错:cvc-complex-type.2.4.c: The matching wildcard is strict, but no declaration can be found for element 问题的解决
- Dubbo源码学习--服务是如何引用的
- (转)淘淘商城系列——引用dubbo服务
- Dubbo系列-6.服务的引用
- 淘淘商城系列——引用dubbo服务
- dubbo 暴露服务 和引用服务
- dubbo与spring整合使用(服务与引用)