您的位置:首页 > 其它

dubbo源码分析-consumer端2-创建注册中心

2017-10-09 10:21 483 查看
转载: http://blog.csdn.net/youaremoon/article/details/50731139
前面一篇文章我们分析了consumer代理的生成。在代理生成的过程中,会创建注册服务(com.alibaba.dubbo.registry.Registry)。通过注册服务提供url注册、订阅、查找的功能。

[java] view
plain copy

public interface RegistryService {

/**

* 注册数据,比如:提供者地址,消费者地址,路由规则,覆盖规则,等数据。

*

* 注册需处理契约:<br>

* 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>

* 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>

* 3. 当URL设置了category=routers时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>

* 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>

* 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>

*

* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin

*/

void register(URL url);

/**

* 取消注册.

*

* 取消注册需处理契约:<br>

* 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>

* 2. 按全URL匹配取消注册。<br>

*

* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin

*/

void unregister(URL url);

/**

* 订阅符合条件的已注册数据,当有注册数据变更时自动推送.

*

* 订阅需处理契约:<br>

* 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>

* 2. 当URL设置了category=routers,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>

* 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>

* 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>

* 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br
20000
>

* 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>

* 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>

*

* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin

* @param listener 变更事件监听器,不允许为空

*/

void subscribe(URL url, NotifyListener listener);

/**

* 取消订阅.

*

* 取消订阅需处理契约:<br>

* 1. 如果没有订阅,直接忽略。<br>

* 2. 按全URL匹配取消订阅。<br>

*

* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin

* @param listener 变更事件监听器,不允许为空

*/

void unsubscribe(URL url, NotifyListener listener);

/**

* 查询符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。

*

* @see com.alibaba.dubbo.registry.NotifyListener#notify(List)

* @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin

* @return 已注册信息列表,可能为空,含义同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。

*/

List<URL> lookup(URL url);

}

本机测试的时候我们可以选择Multicase注册中心,但这种方式受网络结构限制,只适合小规模应用或开发阶段使用,实际线上环境官方推荐Zookeeper中心。因此我们主要分析Zookeeper注册中心的实现。

上一篇文章我们讲到Registry的创建是通过“Registry registry = registryFactory.getRegistry(url);” 这里的registryFactory根据不同的protocol而不同,我们来看看ZookeeperRegistryFactory的实现,ZookeeperRegistryFactory继承自AbstractRegistryFactory,其getRegistry方法代码如下:

[java] view
plain copy

public Registry getRegistry(URL url) {

// 将path和interface都设置成com.alibaba.dubbo.registry.RegistryService

url = url.setPath(RegistryService.class.getName())

.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())

.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);

// 根据url生产serviceString,格式为protocol://[username:password@]ip:port/[serviceKey或path]

// 例如:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService

String key = url.toServiceString();

// 锁定注册中心获取过程,保证注册中心单一实例

LOCK.lock();

try {

Registry registry = REGISTRIES.get(key);

if (registry != null) {

return registry;

}

// 根据url创建registry

registry = createRegistry(url);

if (registry == null) {

throw new IllegalStateException("Can not create registry " + url);

}

REGISTRIES.put(key, registry);

return registry;

} finally {

// 释放锁

LOCK.unlock();

}

}

// 这段在ZookeeperRegistryFactory中

// 直接使用url创建ZookeeperRegistry, zookeeperTransporter对zk的操作进行了封装,目前提供了zkclient和curator两种实现,默认为zkclient

public Registry createRegistry(URL url) {

return new ZookeeperRegistry(url, zookeeperTransporter);

}

目前zookeeperTransporter的修改方式有很多种:

spring配置:<dubbo:registry ... client="curator" />

系统参数:dubbo.registry.client=curator

注册链接:zookeeper://10.20.153.10:2181?client=curator

通过ZookeeperRegistryFactory创建了一个ZookeeperRegistry,来看看这个类的构造方法:

[java] view
plain copy

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {

super(url);

if (url.isAnyHost()) {

throw new IllegalStateException("registry address == null");

}

// 获取group,默认为dubbo,默认的root为/root

String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);

if (! group.startsWith(Constants.PATH_SEPARATOR)) {

group = Constants.PATH_SEPARATOR + group;

}

this.root = group;

// 通过transporter创建连接

zkClient = zookeeperTransporter.connect(url);

// 添加状态变更的事件监听器。注意这里只是添加了监听器,但并没有像zk注册。

zkClient.addStateListener(new StateListener() {

public void stateChanged(int state) {

if (state == RECONNECTED) {

try {// 重连后执行recover方法

recover();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

});

}

// ZookeeperRegistry的父类构造方法

public FailbackRegistry(URL url) {

super(url);

int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

// 添加一个任务,默认5秒后开始,每5秒钟进行一次连接检测

this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {

public void run() {

// 检测并连接注册中心

try {

retry();

} catch (Throwable t) { // 防御性容错

logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);

}

}

}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);

}

// FailbackRegistry 的父类构造方法

public AbstractRegistry(URL url) {

// 设置registryUrl

setUrl(url);

syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);

// 根据host生产注册信息的缓存文件地址

String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");

File file = null;

if (ConfigUtils.isNotEmpty(filename)) {

file = new File(filename);

if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){

if(! file.getParentFile().mkdirs()){

throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");

}

}

}

this.file = file;

// 从file中加载配置到内存中

loadProperties();

// url变更通知,getBackupUrls通过url生成所有链接,如zookeeper://10.20.153.10:2181?backup=10.20.153.11:2181,10.20.153.12:2181,最终生成的链接:

// zookeeper://10.20.153.10:2181/com.alibaba.dubbo.registry.RegistryService?xxx

// zookeeper://10.20.153.11:2181/com.alibaba.dubbo.registry.RegistryService?xxxzookeeper://10.20.153.12:2181/com.alibaba.dubbo.registry.RegistryService?xxx

notify(url.getBackupUrls());

}

可以看到,ZookeeperRegistry初始化主要有以下操作:

1、 获取缓存文件路径,并从该文件加载数据到内存,将注册地址拆分成多个地址(backup的情况有多个地址);

2、 创建一个定时任务,定时对失败的操作进行重试;

3、通过transporter创建连接,并添加一个状态改变的监听器。

创建连接的过程(curator):

connect(url)执行代码如下

[java] view
plain copy

public class CuratorZookeeperTransporter implements ZookeeperTransporter {

public ZookeeperClient connect(URL url) {

return new CuratorZookeeperClient(url);

}

}

[java] view
plain copy

public CuratorZookeeperClient(URL url) {

super(url);

try {

// 设置connectString, 这里的backupAddress包括原地址和备用地址,最终得到字符串形式:ip0:port0,ip1:port1...

Builder builder = CuratorFrameworkFactory.builder()

.connectString(url.getBackupAddress())

.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))

.connectionTimeoutMs(5000);

String authority = url.getAuthority();

if (authority != null && authority.length() > 0) {

builder = builder.authorization("digest", authority.getBytes());

}

client = builder.build();

// 注册连接状态改变事件的监听器,当状态变更时调用stateChanged方法

client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

public void stateChanged(CuratorFramework client, ConnectionState state) {

if (state == ConnectionState.LOST) {

CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);

} else if (state == ConnectionState.CONNECTED) {

CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);

} else if (state == ConnectionState.RECONNECTED) {

CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);

}

}

});

client.start();

} catch (IOException e) {

throw new IllegalStateException(e.getMessage(), e);

}

}

上面又出现一个addListener, 这个和前面的zkClient.addStateListener有啥区别呢? addListener是注册了状态变更的监听器,也就是状态变更时会回匿名ConnectionStateListener中的stateChange方法,stateChange再调用client中的stateChange方法,而stateChange方法调用的正是addStateListener添加的listener。

到这里Registry的初始化工作完成,接下来在创建RegistryDirectory后会调用registry.register方法来进行注册:

[java] view
plain copy

// 父类FailbackRegistry中

public void register(URL url) {

super.register(url);

failedRegistered.remove(url);

failedUnregistered.remove(url);

4000
try {

// 向服务器端发送注册请求

doRegister(url);

} catch (Exception e) {

Throwable t = e;

// 如果开启了启动时检测,则直接抛出异常

boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)

&& url.getParameter(Constants.CHECK_KEY, true)

&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());

boolean skipFailback = t instanceof SkipFailbackWrapperException;

if (check || skipFailback) {

if(skipFailback) {

t = t.getCause();

}

throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);

} else {

logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);

}

// 将失败的注册请求记录到失败列表,定时重试

failedRegistered.add(url);

}

}

doRegister的实现如下:

[java] view
plain copy

protected void doRegister(URL url) {

try {

// /dubbo/com.alibaba.dubbo.demo.DemoService/consumers/xxxxxx , true

zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

} catch (Throwable e) {

throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

}

}

public void create(String path, boolean ephemeral) {

// 从最顶层开始创建持久化节点,最后一层是非持久化节点(ephemeral=true)

int i = path.lastIndexOf('/');

if (i > 0) {

create(path.substring(0, i), false);

}

if (ephemeral) {

createEphemeral(path);

} else {

createPersistent(path);

}

}

toUrlPath(url)得到的路径格式为:/group(默认为dubbo)/(interfaceName)/consumers/xxx, 这也是在zookeeper中的层次结构。注册完成后在monitor中可以看到此consumer。

注册完自身后,还需要订阅provider的信息,调用方式为directory.subscribe,RegistryDirectory实现代码:

[java] view
plain copy

public void subscribe(URL url) {

setConsumerUrl(url);

// RegistryDirectory实现了NotifyListener

registry.subscribe(url, this);

}

registry.subscribe方法在抽象类FailbackRegistry中:

[java] view
plain copy

public void subscribe(URL url, NotifyListener listener) {

// 添加listener到url对应的集合中

super.subscribe(url, listener);

// 从失败的订阅集合中移除该listener

removeFailedSubscribed(url, listener);

try {

// 向服务器端发送订阅请求

doSubscribe(url, listener);

} catch (Exception e) {

Throwable t = e;

// urls为文件缓存中的地址

List<URL> urls = getCacheUrls(url);

if (urls != null && urls.size() > 0) {

// 订阅失败则使用缓存中的url

notify(url, listener, urls);

logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);

} else {

// 如果开启了启动时检测,则直接抛出异常

boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)

&& url.getParameter(Constants.CHECK_KEY, true);

boolean skipFailback = t instanceof SkipFailbackWrapperException;

if (check || skipFailback) {

if(skipFailback) {

t = t.getCause();

}

throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);

} else {

logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);

}

}

// 将失败的订阅请求记录到失败列表,定时重试

addFailedSubscribed(url, listener);

}

}

doSubscribe方法在ZookeeperRegistry中:

[java] view
plain copy

protected void doSubscribe(final URL url, final NotifyListener listener) {

try {

if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {

// 这段先不讲

。。。

} else {

List<URL> urls = new ArrayList<URL>();

// /dubbo/com.alibaba.dubbo.demo.DemoService/providers,

// /dubbo/com.alibaba.dubbo.demo.DemoService/configurators,

// /dubbo/com.alibaba.dubbo.demo.DemoService/routers

for (String path : toCategoriesPath(url)) {

// 添加子节点变更事件处理

ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);

if (listeners == null) {

zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());

listeners = zkListeners.get(url);

}

ChildListener zkListener = listeners.get(listener);

if (zkListener == null) {

listeners.putIfAbsent(listener, new ChildListener() {

public void childChanged(String parentPath, List<String> currentChilds) {

ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));

}

});

zkListener = listeners.get(listener);

}

// 创建持久化的节点

zkClient.create(path, false);

// 创建监听,如果节点下有数据则会返回节点下数据;

// 如对于providers节点,会返回对应接口下已经注册的provider url,相当于此处可以拿到服务端的连接信息

List<String> children = zkClient.addChildListener(path, zkListener);

if (children != null) {

urls.addAll(toUrlsWithEmpty(url, path, children));

}

}

// 通知变更

notify(url, listener, urls);

}

} catch (Throwable e) {

throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

}

}

notify方法主要有两个功能,一是将变更的url存入缓存文件中,二是调用listener.notify方法。步骤二的listener是RegistryDirectory,因此代码又执行到RegistryDirectory的notify方法:

[java] view
plain copy

public synchronized void notify(List<URL> urls) {

List<URL> invokerUrls = new ArrayList<URL>();

List<URL> routerUrls = new ArrayList<URL>();

List<URL> configuratorUrls = new ArrayList<URL>();

for (URL url : urls) {

&nbs
2a74f
p; String protocol = url.getProtocol();

String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

if (Constants.ROUTERS_CATEGORY.equals(category)

|| Constants.ROUTE_PROTOCOL.equals(protocol)) {

routerUrls.add(url);

} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)

|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {

configuratorUrls.add(url);

} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {

invokerUrls.add(url);

} else {

logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());

}

}

// configurators

if (configuratorUrls != null && configuratorUrls.size() >0 ){

this.configurators = toConfigurators(configuratorUrls);

}

// routers

if (routerUrls != null && routerUrls.size() >0 ){

List<Router> routers = toRouters(routerUrls);

if(routers != null){ // null - do nothing

setRouters(routers);

}

}

List<Configurator> localConfigurators = this.configurators; // local reference

// 合并override参数

this.overrideDirectoryUrl = directoryUrl;

if (localConfigurators != null && localConfigurators.size() > 0) {

for (Configurator configurator : localConfigurators) {

this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);

}

}

// providers

refreshInvoker(invokerUrls);

}

/**

* 根据invokerURL列表转换为invoker列表。转换规则如下:

* 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用

* 2.如果传入的invoker列表不为空,则表示最新的invoker列表

* 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。

* @param invokerUrls 传入的参数不能为null

*/

private void refreshInvoker(List<URL> invokerUrls){

if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null

&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

this.forbidden = true; // 禁止访问

this.methodInvokerMap = null; // 置空列表

destroyAllInvokers(); // 关闭所有Invoker

} else {

this.forbidden = false; // 允许访问

Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){

invokerUrls.addAll(this.cachedInvokerUrls);

} else {

this.cachedInvokerUrls = new HashSet<URL>();

this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比

}

if (invokerUrls.size() ==0 ){

return;

}

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表

Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表

// state change

//如果计算错误,则不进行处理.

if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){

logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));

return ;

}

this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;

this.urlInvokerMap = newUrlInvokerMap;

try{

destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker

}catch (Exception e) {

logger.warn("destroyUnusedInvokers error. ", e);

}

}

}

此方法中包含了一个toInvokers方法,该方法通过invokerUrls创建对应的Invoker,并放入newUrlInvokerMap,而暴露的方法名对应invoker则放入newMethodInvokerMap中。此时客户端需要的信息都已经加载。因此toInvokers方法是比较关键的:

[java] view
plain copy

/**

* 将urls转成invokers,如果url已经被refer过,不再重新引用。

*

* @param urls

* @param overrides

* @param query

* @return invokers

*/

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();

if(urls == null || urls.size() == 0){

return newUrlInvokerMap;

}

Set<String> keys = new HashSet<String>();

String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);

for (URL providerUrl : urls) {

//如果reference端配置了protocol,则只选择匹配的protocol

if (queryProtocols != null && queryProtocols.length() >0) {

boolean accept = false;

String[] acceptProtocols = queryProtocols.split(",");

for (String acceptProtocol : acceptProtocols) {

if (providerUrl.getProtocol().equals(acceptProtocol)) {

accept = true;

break;

}

}

if (!accept) {

continue;

}

}

if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {

continue;

}

// 加载provider声明的protocol,如果加载不到则报错(默认protocol=dubbo, 实现为com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol)

if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {

logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()

+ ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));

continue;

}

// 合并url参数 顺序为override > -D >Consumer > Provider

// 并且加入不检查连接是否成功的参数,总是创建Invoker!

URL url = mergeUrl(providerUrl);

String key = url.toFullString(); // URL参数是排序的

if (keys.contains(key)) { // 重复URL

continue;

}

keys.add(key);

// 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer

Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference

Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

if (invoker == null) { // 缓存中没有,重新refer

try {

boolean enabled = true;

if (url.hasParameter(Constants.DISABLED_KEY)) {

enabled = ! url.getParameter(Constants.DISABLED_KEY, false);

} else {

enabled = url.getParameter(Constants.ENABLED_KEY, true);

}

if (enabled) {

// 创建InvokerDelegete,主要用于存储注册中心下发的url地址,用于重新重新refer时能够根据providerURL queryMap overrideMap重新组装

invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);

}

} catch (Throwable t) {

logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);

}

if (invoker != null) { // 将新的引用放入缓存

newUrlInvokerMap.put(key, invoker);

}

}else {

newUrlInvokerMap.put(key, invoker);

}

}

keys.clear();

return newUrlInvokerMap;

}

回顾一下整个过程:
1、通过url创建Registry;

2、从本地的缓存文件加载之前订阅到的数据;

3、Registry通过url包含的地址连接到注册中心;

4、添加一个定时任务,该任务5s后执行,每5s执行一次,执行的内容为:对(注册/取消注册/订阅/取消订阅/通知)失败的列表进行重试;

5、添加一个状态变更的事件监听器,当连接断开后,加入到注册失败的url列表中;

6、将自身的连接注册到consumer节点下,供管理中心查询;

7、订阅接口下的其他节点(providers/configurators/routers)的变更,并获取其已有值;

8、将7中获取到的对应providers节点下的值保存到本地的缓存文件中,这些值就是服务端的连接信息;

9、通过服务端的连接信息创建Invoker;

Invoker的创建流程比较复杂,我们下一篇文章单独介绍。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: