您的位置:首页 > 其它

dubbo解析-详解配置中心

2020-06-28 04:24 435 查看

本文基于dubbo 2.7.5版本代码

文章目录

  • 三、读取配置中心的配置
  • 四、监听配置中心
  • 五、ServiceNameMappingListener
  • 一、总述

    dubbo支持多种形式的配置中心,包括zk,consul,apollo等。dubbo也提供了访问各种配置中心的实现类,实现类如下:

    这些实现类的均实现自接口DynamicConfiguration,可以将dubbo需要的几乎所有的配置都放在配置中心,同时dubbo还提供了一些监听器,当配置中心的数据发生变化,这些监听器会监听到数据变化进而修改本地配置。另外注意ServiceNameMappingListener监听器,dubbo服务端在启动的时候将服务的接口名、group、version、协议等信息通过该监听器发布到配置中心。
    本文将以zk作为配置中心进行介绍。

    二、ConfigCenterBean

    ConfigCenterBean用于设置如何连接配置中心,可以设置配置中心地址、连接超时时间、是否优先级最高等。还可以设置连接配置中心的用户名、密码。
    在spring配置文件中以dubbo.config-centers或者dubbo.config-center为前缀可以设置ConfigCenterBean对象的值。

    1、includeSpringEnv属性

    includeSpringEnv属性需要注意:
    includeSpringEnv用于设置是否需要从spring的Environment对象中获取配置信息。默认为false。ConfigCenterBean实现了EnvironmentAware接口,那么在启动的时候将会调用setEnvironment方法,代码如下:

    public void setEnvironment(Environment environment) {
    if (includeSpringEnv) {
    // Get PropertySource mapped to 'dubbo.properties' in Spring Environment.
    setExternalConfig(getConfigurations(getConfigFile(), environment));
    // Get PropertySource mapped to 'application.dubbo.properties' in Spring Environment.
    setAppExternalConfig(getConfigurations(StringUtils.isNotEmpty(getAppConfigFile()) ? getAppConfigFile() : ("application." + getConfigFile()), environment));
    }
    }

    如果includeSpringEnv设置为true,那么将从Environment对象中读取dubbo.properties和application.dubbo.properties的值,并分别设置给属性externalConfiguration和appExternalConfiguration。也可以修改从Environment对象中读取其他的值,设置configFile和appConfigFile两个属性即可。
    根据我的分析以及对spring的理解,如果想在Environment对象中设置dubbo.properties和application.dubbo.properties的值,需要我们编写相关代码,将这两个值设置到Environment对象。也可以通过其他方式,比如jndi,但是相对来说比较复杂。
    那么这里设置属性externalConfiguration和appExternalConfiguration有什么用呐?
    前面文章《dubbo解析-DubboBootstrap在dubbo中的作用》提到在DubboBootstrap中会对Environment初始化,初始化的时候将ConfigCenterBean的externalConfiguration和appExternalConfiguration的值设置到Environment对象的externalConfigurationMap和appExternalConfigurationMap。代码如下:

    public void initialize() throws IllegalStateException {
    ConfigManager configManager = ApplicationModel.getConfigManager();
    Optional<Collection<ConfigCenterConfig>> defaultConfigs = configManager.getDefaultConfigCenter();
    defaultConfigs.ifPresent(configs -> {
    for (ConfigCenterConfig config : configs) {
    //externalConfigurationMap和appExternalConfigurationMap是Map对象
    this.setExternalConfigMap(config.getExternalConfiguration());
    this.setAppExternalConfigMap(config.getAppExternalConfiguration());
    }
    });
    }

    从代码中可以看出,Environment对象只保存了最后一个ConfigCenterBean的externalConfiguration和appExternalConfiguration的值。之后代码也会修改externalConfigurationMap和appExternalConfigurationMap,但是只是在此基础上更新,比如新增一个key和value,修改value值。

    三、读取配置中心的配置

    dubbo在启动的时候读取配置中心的配置数据。下面的分析以zk作为配置中心为例。同时zk也是默认的配置中心。
    DubboBootstrap初始化(initialize方法)时要调用startConfigCenter方法,该方法代码如下:

    private void startConfigCenter() {
    //获取所有ConfigCenterConfig对象,ConfigCenterBean是其子类,其实这个位置获得是ConfigCenterBean对象
    Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();
    if (CollectionUtils.isNotEmpty(configCenters)) {
    CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();
    //遍历每个ConfigCenterBean对象
    for (ConfigCenterConfig configCenter : configCenters) {
    //使用java系统属性等设置ConfigCenterBean对象的属性
    configCenter.refresh();
    //校验ConfigCenterBean对象的parameters是否合法
    ConfigValidationUtils.validateConfigCenterConfig(configCenter);
    //prepareEnvironment方法建立与配置中心的连接,拉取配置数据,并保存到本地
    compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
    }
    environment.setDynamicConfiguration(compositeDynamicConfiguration);
    }
    //使用配置中心的配置更新如下对象的属性:
    //ApplicationConfig、MonitorConfig、ModuleConfig、ProtocolConfig、RegistryConfig、
    //ProviderConfig、ConsumerConfig。
    //更新对象属性时,使用如下规则搜索配置:dubbo.类名去掉Config.id值.属性名,
    //比如更新id为“test”的ProviderConfig的threads属性时,
    //从配置中心搜索key=dubbo.provider.test.threads,如果能找到就更新threads属性。
    configManager.refreshAll();
    }
    private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) {
    //ConfigCenterConfig必须配置address,否则为无效
    if (configCenter.isValid()) {
    if (!configCenter.checkOrUpdateInited()) {
    return null;
    }
    //构建连接配置中心的url,url中包含了ip、端口、协议等
    //getDynamicConfiguration根据url的协议使用SPI创建DynamicConfiguration对象
    //DynamicConfiguration对象建立与配置中心的连接
    DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());//从配置中心拉取key=dubbo.properties,group=dubbo的值(这两个值都是默认值,我们可以通过修改属性configFile来修改key)
    String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());String appGroup = getApplication().getName();
    String appConfigContent = null;
    if (isNotEmpty(appGroup)) {
    //从配置中心拉取应用配置,group是应用名,key是appConfigFile的值,
    //如果appConfigFile=null,则使用configFile,
    //默认是使用configFile的值,也就是dubbo.properties
    appConfigContent = dynamicConfiguration.getProperties
    (isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(),
    appGroup
    );
    }
    try {
    environment.setConfigCenterFirst(configCenter.isHighestPriority())
    //将配置信息保存到Environment的Map属性中,后面的配置会覆盖之前的
    environment.updateExternalConfigurationMap(parseProperties(configContent));
    environment.updateAppExternalConfigurationMap(parseProperties(appConfigContent));
    } catch (IOException e) {
    throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
    }
    return dynamicConfiguration;
    }
    return null;
    }

    上面的代码主要目的是连接配置中心,然后从配置中心拉取指定key和group的配置数据,并保存到本地的Map对象中,最后使用这些配置数据更新dubbo的XXXConfig对象属性。
    上面代码会拉取两个配置数据,一个是configFile指定的,另一个是appConfigFile,拉取的第一个配置数据,可以认为是全局配置,第二个配置数据可以认为是指定应用的独特配置,而且会覆盖第一个配置数据。但是两个配置优先级都低于java应用配置(java应用配置是指通过System.getProperty获取的)。默认情况下,两个配置数据在配置中心的路径如下图(下图来源于官网):

    • namespace,用于不同配置的环境隔离。
    • config,Dubbo约定的固定节点,不可更改,所有配置和服务治理规则都存储在此节点下。
    • dubbo/application,分别用来隔离全局配置、应用级别配置:dubbo是默认group值,application对应应用名
    • dubbo.properties,此节点的node value存储具体配置内容

    prepareEnvironment方法使用如下代码建立与配置中心的连接,下面我们详细分析一下连接是如何建立的:

    DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());

    我们来看方法getDynamicConfiguration:

    static DynamicConfiguration getDynamicConfiguration(URL connectionURL) {
    //获取连接配置中心使用的协议,下面分析以zk为例
    String protocol = connectionURL.getProtocol();
    //使用SPI加载DynamicConfigurationFactory对象,其支持的协议以及对应的类可以参见文件
    //org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory。
    DynamicConfigurationFactory factory = getDynamicConfigurationFactory(protocol);
    //使用DynamicConfigurationFactory创建DynamicConfiguration
    return factory.getDynamicConfiguration(connectionURL);
    }
    static DynamicConfigurationFactory getDynamicConfigurationFactory(String name) {
    Class<DynamicConfigurationFactory> factoryClass = DynamicConfigurationFactory.class;
    ExtensionLoader<DynamicConfigurationFactory> loader = getExtensionLoader(factoryClass);
    return loader.getOrDefaultExtension(name);
    }

    如果使用的配置中心是zk,那么DynamicConfigurationFactory的实现类是ZookeeperDynamicConfigurationFactory,代码如下:

    public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
    private ZookeeperTransporter zookeeperTransporter;
    //使用SPI创建ZookeeperDynamicConfigurationFactory对象时调用的该方法
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
    this.zookeeperTransporter = zookeeperTransporter;
    }
    @Override
    protected DynamicConfiguration createDynamicConfiguration(URL url) {
    return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
    }
    }

    ZookeeperDynamicConfiguration的构造方法如下:

    ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
    this.url = url;
    //构建访问配置中心的根路径,默认是:/dubbo/config/
    rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
    initializedLatch = new CountDownLatch(1);
    //创建监听器,监听器后面分析
    this.cacheListener = new CacheListener(rootPath, initializedLatch);
    //创建单个线程,用于处理被监听的事件
    this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
    //建立与配置中心的连接
    zkClient = zookeeperTransporter.connect(url);
    //设置监听器和监听目录
    zkClient.addDataListener(rootPath, cacheListener, executor);try {
    //可以通过ConfigCenterBean的parameters设置init.timeout的值,init.timeout表示建立链接的超时时间
    long timeout = url.getParameter("init.timeout", 5000);
    boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
    if (!isCountDown) {
    throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
    + url + " is correct");
    }
    } catch (InterruptedException e) {
    logger.warn("Failed to build local cache for config center (zookeeper)." + url);
    }
    }

    建立链接后,便可以使用下面的代码获取配置了:

    String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());

    通过key和group得到配置,对于zk来说,访问的目录是rootPath/group/key,其中rootPath的默认值是/dubbo/config/,group如果没有设置值的话,使用默认值:dubbo。

    四、监听配置中心

    建立与配置中心链接时,在ZookeeperDynamicConfiguration的构造方法中设置监听器CacheListener。代码如下:

    zkClient.addDataListener(rootPath, cacheListener, executor);

    cacheListener将监听rootPath路径。我们来看一下zkClient.addDataListener方法:

    //该方法用于设置监听zk的指定目录
    public void addDataListener(String path, DataListener listener, Executor executor) {
    //listeners是一个两层map对象,类型如下:
    //ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>>
    //最外层的map,key是被监听的路径,内层的map,key和value都是监听器,
    //其区别是value可以认为是对key的封装,在本代码中key是CacheListener,value是CuratorWatcherImpl。
    ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path);
    if (dataListenerMap == null) {
    listeners.putIfAbsent(path, new ConcurrentHashMap<DataListener, TargetDataListener>());
    dataListenerMap = listeners.get(path);
    }
    TargetDataListener targetListener = dataListenerMap.get(listener);
    if (targetListener == null) {
    //createTargetDataListener创建目标监听器,方法见下面[1]
    dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener));
    targetListener = dataListenerMap.get(listener);
    }
    //访问zk将targetListener注册为监听器,方法代码见[2]
    addTargetDataListener(path, targetListener, executor);
    }
    //[1] 以zk为配置中心为例
    protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
    return new CuratorWatcherImpl(client, listener);
    }
    //[2]
    public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
    try {
    return client.getChildren().usingWatcher(listener).forPath(path);
    } catch (NoNodeException e) {
    return null;
    } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
    }
    }

    zkClient.addDataListener方法的监听器其实是CuratorWatcherImpl ,CuratorWatcherImpl的代码如下:

    static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
    private CuratorFramework client;
    private volatile ChildListener childListener;
    private volatile DataListener dataListener;
    private String path;
    
    public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
    //代码删减
    }
    //createTargetDataListener方法调用下面的方法创建CuratorWatcherImpl对象,
    //从这里可以看出,只监听zk节点数据的变化
    public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
    this.dataListener = dataListener;
    }
    
    //...
    //代码删减
    
    //当数据有变化时,通知调用下面的方法
    //本方法主要是做类型的转换,然后调用CacheListener通知数据变化
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
    if (dataListener != null) {
    if (logger.isDebugEnabled()) {
    logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
    }
    TreeCacheEvent.Type type = event.getType();
    EventType eventType = null;
    String content = null;
    String path = null;
    switch (type) {
    case NODE_ADDED:
    eventType = EventType.NodeCreated;
    path = event.getData().getPath();
    content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
    break;
    case NODE_UPDATED:
    eventType = EventType.NodeDataChanged;
    path = event.getData().getPath();
    content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
    break;
    case NODE_REMOVED:
    path = event.getData().getPath();
    eventType = EventType.NodeDeleted;
    break;
    case INITIALIZED:
    eventType = EventType.INITIALIZED;
    break;
    case CONNECTION_LOST:
    eventType = EventType.CONNECTION_LOST;
    break;
    case CONNECTION_RECONNECTED:
    eventType = EventType.CONNECTION_RECONNECTED;
    break;
    case CONNECTION_SUSPENDED:
    eventType = EventType.CONNECTION_SUSPENDED;
    break;
    }
    //调用CacheListener
    dataListener.dataChanged(path, content, eventType);
    }
    }
    }

    CacheListener的dataChanged方法如下:

    public void dataChanged(String path, Object value, EventType eventType) {
    if (eventType == null) {
    return;
    }
    //当发生INITIALIZED类型的事件时,表示连接已经建立,
    //在ZookeeperDynamicConfiguration构造方法中,调用initializedLatch.await方法等待该事件,
    //默认等待5s,超时抛出异常
    if (eventType == EventType.INITIALIZED) {
    initializedLatch.countDown();
    return;
    }
    
    if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
    return;
    }
    
    // TODO We only care the changes happened on a specific path level, for example
    //  /dubbo/config/dubbo/configurators, other config changes not in this level will be ignored
    //本监听器对路径深度有检查,深度至少是四级
    if (path.split("/").length >= MIN_PATH_DEPTH) {
    //获取key值,也就是路径中最后一个“/”后面的内容
    String key = pathToKey(path);
    ConfigChangeType changeType;
    //本监听器只处理下面三种事件:增加、删除、修改
    switch (eventType) {
    case NodeCreated:
    changeType = ConfigChangeType.ADDED;
    break;
    case NodeDeleted:
    changeType = ConfigChangeType.DELETED;
    break;
    case NodeDataChanged:
    changeType = ConfigChangeType.MODIFIED;
    break;
    default:
    return;
    }
    //创建事件ConfigChangedEvent
    ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType);
    //通知各个监听器
    //CacheListener其实是一个复合监听器,包含了多个子监听器
    //CacheListener根据被监听路径将ConfigChangedEvent事件发送给对应的监听器
    Set<ConfigurationListener> listeners = keyListeners.get(path);
    if (CollectionUtils.isNotEmpty(listeners)) {
    listeners.forEach(listener -> listener.process(configChangeEvent));
    }
    }
    }

    dataChanged方法可以看出,CacheListener是一个复合监听器,其持有一个监听器集合,当指定目录下的数据发生变化时,通知集合中的监听器,这个集合可以包含的监听器如下:

    • ServiceConfigurationListener监听目录:/dubbo/config/dubbo/接口名:version:goup.configurators
    • ProviderConfigurationListener监听目录:/dubbo/config/dubbo/ApplicationConfig的name值.configurators
    • ConsumerConfigurationListener监听目录:/dubbo/config/dubbo/ApplicationConfig的name值.configurators
    • ReferenceConfigurationListener监听目录:/dubbo/config/dubbo/接口名:version:goup.configurators
    • TagRouter监听目录:/dubbo/config/dubbo/remote.application属性值.tag-router
    • ServiceRouter监听目录:/dubbo/config/dubbo/接口名:version:goup.condition-router
    • AppRouter监听目录:/dubbo/config/dubbo/应用名.condition-router

    上面这些监听器都是在其构造方法中将自身作为监听器添加到CacheListener的listeners中。

    dubbo在启动时,从配置中心的目录/dubbo/config/dubbo/ConfigCenterBean的configFile或者/dubbo/config/应用名/ConfigCenterBean的appConfigFile下获取的配置数据,从上面7个监听路径可以看出,可以通过修改configFile或者appConfigFile的值,使得ConsumerConfigurationListener或者ProviderConfigurationListener监听上述两个路径。

    但是从路径名含义上分析,我不建议使用上述两个监听器监听这两个目录。

    下面简单介绍前四个监听器的作用:

    1. ServiceConfigurationListener:根据修改后的配置,重新发布服务;
    2. ProviderConfigurationListener:根据修改后的配置,重新发布服务;
    3. ReferenceConfigurationListener:根据修改后的配置,重新建立对远程服务的引用;
    4. ConsumerConfigurationListener:根据修改后的配置,重新建立对远程服务的引用。

    因为ProviderConfigurationListener和ConsumerConfigurationListener监听应用目录,如果dubbo应用发布的服务或者引用的服务比较多,则会造成dubbo修改配置不及时。

    如果需要修改配置,可以在配置中心修改相应目录下的数据,这样上述监听器便监听到数据变化,进而修改本地配置。

    五、ServiceNameMappingListener

    该监听器监听事件ServiceConfigExportedEvent,当发生该事件后,将接口名, 应用名等信息发布到配置中心的/dubbo/config/mapping,这些信息在服务自省里面使用。
    首先看一下这个监听器如何被创建的。

    1、如何创建ServiceNameMappingListener

    DubboBootstrap对象有一个属性是EventDispatcher类,代码如下:

    private final EventDispatcher eventDispatcher = EventDispatcher.getDefaultExtension();

    该属性是一个事件分发器,默认是DirectEventDispatcher,在该类的构造方法中调用下面的方法加载所有的监听器:

    protected void loadEventListenerInstances() {
    //通过SPI加载EventListener的实现类,
    //实现类及其名字参见下方文件org.apache.dubbo.event.EventListener
    ExtensionLoader<EventListener> loader = ExtensionLoader.getExtensionLoader(EventListener.class);
    //将监听器注册到本事件分发器,之后便可以监听本分发器发布的事件
    loader.getSupportedExtensionInstances().forEach(this::addEventListener);
    }

    文件org.apache.dubbo.event.EventListener内容如下:

    service-mapping=org.apache.dubbo.config.event.listener.ServiceNameMappingListener
    config-logging=org.apache.dubbo.config.event.listener.LoggingEventListener
    service-instance=org.apache.dubbo.registry.client.event.listener.CustomizableServiceInstanceListener
    registry-logging=org.apache.dubbo.registry.client.event.listener.LoggingEventListener

    加载完ServiceNameMappingListener之后,便可以监听ServiceConfigExportedEvent事件。

    2、发布事件ServiceConfigExportedEvent

    ServiceConfig类在方法doExport中将服务暴露,暴露完毕后,发布事件ServiceConfigExportedEvent。之后ServiceNameMappingListener监听到该事件,代码如下:

    public class ServiceNameMappingListener implements EventListener<ServiceConfigExportedEvent> {
    //ServiceNameMapping的实现类是DynamicConfigurationServiceNameMapping
    private final ServiceNameMapping serviceNameMapping = getDefaultExtension();
    @Override
    public void onEvent(ServiceConfigExportedEvent event) {
    ServiceConfig serviceConfig = event.getServiceConfig();
    //获取被暴露服务的URL,一般URL只有一个,如果服务以多种协议暴露,那么会有多个
    List<URL> exportedURLs = serviceConfig.getExportedUrls();
    exportedURLs.forEach(url -> {
    String serviceInterface = url.getServiceInterface();//服务的全限定接口名
    //gourp、version、protocol未发布到配置中心,没有使用
    String group = url.getParameter(GROUP_KEY);
    String version = url.getParameter(VERSION_KEY);
    String protocol = url.getProtocol();//被暴露服务使用的协议
    //调用DynamicConfigurationServiceNameMapping的map方法
    serviceNameMapping.map(serviceInterface, group, version, protocol);
    });
    }
    }

    DynamicConfigurationServiceNameMapping的map方法代码如下:

    public void map(String serviceInterface, String group, String version, String protocol) {
    //MetadataService服务信息不发布到配置中心
    if (IGNORED_SERVICE_INTERFACES.contains(serviceInterface)) {
    return;
    }
    DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
    
    String key = getName();//key是应用名
    //content是zk节点的数据值
    String content = valueOf(System.currentTimeMillis());
    execute(() -> {
    //将服务信息发布到配置中心,配置中心可能有多个,在publishConfig方法会遍历每个配置中心发布数据
    //以zk为例,在zk上的路径是/dobbo/config/mapping/接口名/应用名,节点值是content
    dynamicConfiguration.publishConfig(key, buildGroup(serviceInterface, group, version, protocol), content);
    if (logger.isInfoEnabled()) {
    logger.info(String.format("Dubbo service[%s] mapped to interface name[%s].",
    group, serviceInterface, group));
    }
    });
    }

    监听器收到事件ServiceConfigExportedEvent后,将服务的接口名、应用名以及服务发布时间注册到配中心。以zk为例,在配置中心的路径组成是rootPath/mapping/接口名/应用名,rootPath也就是ZookeeperDynamicConfiguration类的rootPath属性值,默认是/dobbo/config,接口名是接口的全限定名,包括了包名在内。

    在服务发现场景中,消费端访问配置中心,通过接口名可以找到可用的应用,具体原理见后一篇文章。

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: