基于ZooKeeper与zkclient的统一配置管理实现(二)
2017-01-16 00:00
525 查看
摘要: 基于ZooKeeper与zkclient实现统一的配置管理,包括配置文件的发布、修改,本地配置文件的更新等
上一篇博客《基于ZooKeeper与zkclient的统一配置管理实现(一)》分享了基于ZooKeeper原生api实现的统一配置管理,本篇文章将通过使用zkclient封装后的api来再次实现该功能。
实现的效果与上一篇文章类似,这里不再赘述。
启动ZooKeeper集群的方法同上一篇博客,这里不再赘述,各位可以自行异步《基于ZooKeeper与zkclient的统一配置管理实现(一)》查看。
其中IConfigPublisher和IConfigSubscriber是接口,看一下两个接口的定义:
IConfigPublisher接口只有一个publish方法,主要的工作就是把configDir目录下的配置文件发布到zkServerHost的configRootNode节点中去。
IConfigSubscriber接口也只有一个方法,主要的工作就是订阅ZkServerHost中的configRootNode节点的变化情况。
现在来看下这两个主要的核心接口的实现情况。
实现方法很简单,先创建了一个ZkClient对象,然后创建了一个根节点,最后扫描了指定目录下的所有配置文件,并将符合要求的配置文件(及目录)加入到ZooKeeperServer中去。
subscribe方法的具体实现也很简单,先是清空本地的配置文件目录,然后订阅根节点和递归订阅所有子节点。订阅的时候,会将每个节点和该节点的子节点的情况保存到Map中去,具体的原因在上一篇博客中已经做过说明,这个不再赘述。具体执行订阅的方法是ZkConfigSubscriber类中的subscribePath()方法,来看下该方法的内容:
主要是用一个CopyOnWriteArraySet存储所有已经订阅的节点的path,防止重复订阅。
其中订阅时使用了两个Listener类,分别是ChildrenChangeListener和DataChangeListener。
ZkClient会在NodeChildChanged事件发生时主动触发IZkChildListener接口的handleChildChange方法。所以我们只需要实现IZkChildListener接口的handleChildChange方法即可,并且同一个path只需要订阅一次,zkclient会自动为我们对path进行续订。
需要注意的是,当出现新增或修改事件时,只需要将最新的配置文件的内容同步到本地即可,但是出现删除事件时,除了需要删除本地的相关配置文件,还需要将已经订阅的事件取消掉,也就是需要执行ZkConfigSubscriber.unsubscribePath()方法。
至此,通过ZkClient重构的统一配置管理框架就完成了。
经过实际测试,zkclient可以完美解决上一篇博客中未解决的问题,这得益于zkclient大量正确的使用了retryUntilConnected方法。
上一篇博客《基于ZooKeeper与zkclient的统一配置管理实现(一)》分享了基于ZooKeeper原生api实现的统一配置管理,本篇文章将通过使用zkclient封装后的api来再次实现该功能。
实现的效果与上一篇文章类似,这里不再赘述。
系统的结构
系统仍然是由四个组件组成:ZooKeeperServer
集群或单机版的ZooKeeper服务端,主要用以存储IConfigPublisher发布的配置文件信息IConfigPublisher
配置文件的发布器,负责将配置文件信息发布到ZooKeeperServer中去IConfigSubscriber
配置文件变更情况的订阅器,由客户端开启对服务器配置文件信息的订阅,当配置信息发生变更时,负责将本地的信息更新成最新的状态ZkConfigChanger
配置文件更改器,一般由用户手动调用,用来更改配置文件的信息启动ZooKeeper集群的方法同上一篇博客,这里不再赘述,各位可以自行异步《基于ZooKeeper与zkclient的统一配置管理实现(一)》查看。
其中IConfigPublisher和IConfigSubscriber是接口,看一下两个接口的定义:
/** * Config files publisher * @author hwang * */ public interface IConfigPublisher { /** * publish config files under {@link configDir} to {@link configRootNode} of {@link zkServerHost} * @param zkServerHost * @param configRootNode * @param configDir */ public void publish(String zkServerHost,String configRootNode,String configDir); }
IConfigPublisher接口只有一个publish方法,主要的工作就是把configDir目录下的配置文件发布到zkServerHost的configRootNode节点中去。
/** * Subscribe Config files change * @author hwang * */ public interface IConfigSubscriber { /** * <p>Subscribe config files change event under rootNode {@link configRootNode} of {@link zkServerHost}</p> * <p>include the dataChange and childrenChange </p> * @param zkServerHost * @param configRootNode */ public void subscribe(String zkServerHost,String configRootNode); }
IConfigSubscriber接口也只有一个方法,主要的工作就是订阅ZkServerHost中的configRootNode节点的变化情况。
现在来看下这两个主要的核心接口的实现情况。
配置文件发布器IConfigPublisher
IConfigPublisher接口的实现类是ZkConfigPublisher,看一下ZkConfigPublisher是怎么实现publish方法的:public class ZkConfigPublisher implements IConfigPublisher{ private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class); private ZkClient client; private String configRootNode; @Override public void publish(String zkServerHost,String configRootNode,String configDir){ try{ if(client==null){ client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } this.configRootNode = configRootNode; String rootNode = "/" + configRootNode; // 创建根节点 ZkClientNodeUtil.createNode(client, rootNode, configDir); // 扫描所有配置文件 this.scanConfigFiles(configDir,ZkConstant.ACCEPT_SUFFIX); }catch(Exception e){ logger.error("",e); } } /** * 扫描指定目录下的所有配置文件,并将内容写入到zookeeper节点中 * @param path 扫描的目录 * @param acceptSuffix 接受的文件后缀 * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{ File dir = new File(path); if(dir.exists() && dir.isDirectory()){ File[] subFiles = dir.listFiles(); for(File file : subFiles){ String absPath = file.getAbsolutePath(); String fileName = file.getName(); if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){ this.scanConfigFiles(absPath,acceptSuffix); }else{ String parentDir = file.getParentFile().getAbsolutePath(); // 读取文件内容 String fileContent = FileUtils.readFileToString(file,ZkConstant.CONF_CHAR_SET); // 创建目录节点 ZkClientNodeUtil.createDirNode(client, configRootNode, parentDir); // 创建该目录下的文件节点 ZkClientNodeUtil.createFileNode(client, configRootNode, parentDir, fileName, fileContent); } } } } }
实现方法很简单,先创建了一个ZkClient对象,然后创建了一个根节点,最后扫描了指定目录下的所有配置文件,并将符合要求的配置文件(及目录)加入到ZooKeeperServer中去。
配置文件订阅器IConfigSubscriber
IConfigSubscriber接口的实现类是ZkConfigSubscriber,看一下ZkConfigSubscriber是怎么实现subscribe方法的:public class ZkConfigSubscriber implements IConfigSubscriber{ private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class); private ZkClient client; @Override public void subscribe(String zkServerHost, String configRootNode) { try{ if(client==null){ client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } String rootNode = "/" + configRootNode; this.clearConfigDir(client,rootNode); this.subscribeRootNode(client, rootNode); // 等待配置信息变更 Thread.currentThread().join(); }catch(Exception e){ logger.error("",e); } } /** * 清空本地的配置文件目录 * @param client * @param rootNode * @throws IOException * @throws InterruptedException * @throws KeeperException */ private void clearConfigDir(ZkClient client,String rootNode) throws IOException{ if(client.exists(rootNode)){ String configDir = client.readData(rootNode); FileUtils.deleteDirectory(new File(configDir)); logger.info("Delete config dir:"+configDir); } } /** * 订阅根节点和递归订阅所有子节点 * @param client * @param rootNodePath * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void subscribeRootNode(ZkClient client,String rootNodePath) throws IOException{ if(client.exists(rootNodePath)){ logger.debug("subscribe node:"+rootNodePath); ZkConfigSubscriber.subscribePath(client, rootNodePath); List<String> subList = client.getChildren(rootNodePath); if(null!=subList && subList.size()>0){ // 将节点的所有子节点保存起来 NodeChildrenChangedWrapper.addChildren(rootNodePath, subList); } for (String subNode : subList) { this.subscribeSubNode(client,rootNodePath,subNode); } }else{ logger.warn("rootNode:"+rootNodePath+" does not exists!"); } } /** * 订阅子节点 * @param client * @param currentNode * @param subNode * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void subscribeSubNode(ZkClient client,String currentNode,String subNode) throws IOException{ String nodePath = currentNode+"/"+subNode; if(nodePath.startsWith("/")){ // 订阅子节点 if(client.exists(nodePath)){ // sync content to client String content = client.readData(nodePath); OnetimeConfigSyncer.syncToClient(content); logger.debug("subscribe node:"+nodePath); ZkConfigSubscriber.subscribePath(client, nodePath); List<String> subList = client.getChildren(nodePath); if(null!=subList && subList.size()>0){ // 将节点的所有子节点保存起来 NodeChildrenChangedWrapper.addChildren(nodePath, subList); } for (String _subNode : subList) { this.subscribeSubNode(client,nodePath,_subNode); } }else{ logger.warn("subNode:"+nodePath+" does not exists!"); } } } }
subscribe方法的具体实现也很简单,先是清空本地的配置文件目录,然后订阅根节点和递归订阅所有子节点。订阅的时候,会将每个节点和该节点的子节点的情况保存到Map中去,具体的原因在上一篇博客中已经做过说明,这个不再赘述。具体执行订阅的方法是ZkConfigSubscriber类中的subscribePath()方法,来看下该方法的内容:
/** * Store the paths that already subscribed */ private static Set<String> subscribedPathSet = new CopyOnWriteArraySet<String>(); public static void subscribePath(ZkClient client,String path){ if(!subscribedPathSet.contains(path)){ subscribedPathSet.add(path); // Subscribe ChildChange and DataChange event at path client.subscribeChildChanges(path, new ChildrenChangeListener(client)); client.subscribeDataChanges(path, new DataChangeListener(client)); logger.info("Subscribe ChildChange and DataChange event at path:"+path); } } public static void unsubscribePath(ZkClient client,String path){ if(subscribedPathSet.contains(path)){ subscribedPathSet.remove(path); // Unsubscribe ChildChange and DataChange event at path client.unsubscribeChildChanges(path, new ChildrenChangeListener(client)); client.unsubscribeDataChanges(path, new DataChangeListener(client)); logger.info("Unsubscribe ChildChange and DataChange event at path:"+path); } }
主要是用一个CopyOnWriteArraySet存储所有已经订阅的节点的path,防止重复订阅。
其中订阅时使用了两个Listener类,分别是ChildrenChangeListener和DataChangeListener。
ChildrenChangeListener
先来看下ChildrenChangeListener的实现:/** * ChildrenChangeListener * @author hwang * */ public static class ChildrenChangeListener implements IZkChildListener{ private static final Log logger = LogFactory.getLog(ChildrenChangeListener.class); private ZkClient client; public ChildrenChangeListener(ZkClient client){ this.client = client; } @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if(currentChilds==null || currentChilds.isEmpty()){ logger.warn("No currentChilds get form parentPath:"+parentPath); return; } ChildrenChangeResult changeResult = NodeChildrenChangedWrapper.diff(parentPath, currentChilds); ChildrenChangeType changeType = changeResult.getChangeType(); List<String> changePath = changeResult.getChangePath(); if(changePath==null || changePath.isEmpty()){ logger.warn("No children changePath get form parentPath:"+parentPath); return; } switch(changeType){ case add:{ for(String subPath : changePath){ logger.info("Add children node,path:"+parentPath+"/"+subPath); String path = parentPath+"/"+subPath; RealtimeConfigSyncer.syncToClient(client,path); } }break; case delete:{ for(String subPath : changePath){ ZkConfigSubscriber.unsubscribePath(client, subPath); String filePath = subPath.replaceAll(ZkConstant.SEPRATOR, "/"); FileUtils.deleteQuietly(new File(filePath)); logger.info("Delete children node,file:"+filePath); } }break; case update:{ logger.info("Update children node,will do nothing"); }break; default:{ logger.info("Default children node operate,will do nothing"); }break; } } }
ZkClient会在NodeChildChanged事件发生时主动触发IZkChildListener接口的handleChildChange方法。所以我们只需要实现IZkChildListener接口的handleChildChange方法即可,并且同一个path只需要订阅一次,zkclient会自动为我们对path进行续订。
DataChangeListener
同样的还有IZkDataListener接口,我们只需要实现IZkDataListener接口的handleDataChange和handleDataDeleted方法即可,下面就是该接口的具体实现情况:/** * DataChangeListener * @author hwang */ public static class DataChangeListener implements IZkDataListener{ private static final Log logger = LogFactory.getLog(DataChangeListener.class); private ZkClient client; public DataChangeListener(ZkClient client){ this.client = client; } @Override public void handleDataChange(String dataPath, Object data) throws Exception { logger.info("handleDataChange event,dataPath:"+dataPath); RealtimeConfigSyncer.syncToClient(client,dataPath); } @Override public void handleDataDeleted(String dataPath) throws Exception { logger.info("handleDataDeleted event,dataPath:"+dataPath); ZkConfigSubscriber.unsubscribePath(client, dataPath); String filePath = dataPath.substring(dataPath.indexOf(ZkConstant.SEPRATOR)).replaceAll(ZkConstant.SEPRATOR, "/"); FileUtils.deleteQuietly(new File(filePath)); } }
需要注意的是,当出现新增或修改事件时,只需要将最新的配置文件的内容同步到本地即可,但是出现删除事件时,除了需要删除本地的相关配置文件,还需要将已经订阅的事件取消掉,也就是需要执行ZkConfigSubscriber.unsubscribePath()方法。
配置文件更改器ZkConfigChanger
实现完发布器和订阅器之后,最后的一个就是配置文件更改器了。更改器主要的工作就是用来修改ZooKeeperServer端的配置文件的内容,具体的实现如下:/** * 服务端配置文件更改器 * @author hwang * */ public class ZkConfigChanger { private static final Log logger = LogFactory.getLog(ZkConfigChanger.class); private static ZkClient client; /** * 初始化zkclient */ public static void init(){ if(client==null){ try { client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } catch (Exception e) { logger.error("",e); } } } /** * 新增目录节点 * @param configRootNode * @param dirAbsolutePath 目录的绝对路径,该目录必须是/config/开头的目录 * @throws KeeperException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static boolean addConfigDir(String configRootNode,String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(dirAbsolutePath)){ logger.error("dirAbsolutePath can't be empty"); return false; } return ZkClientNodeUtil.createDirNode(client, configRootNode, dirAbsolutePath); } /** * 删除目录节点 * @param configRootNode * @param dirAbsolutePath * @throws InterruptedException * @throws KeeperException */ public static boolean deleteConfigDir(String configRootNode,String dirAbsolutePath) throws InterruptedException, KeeperException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(dirAbsolutePath)){ logger.error("dirAbsolutePath can't be empty"); return false; } return ZkClientNodeUtil.deleteDirNode(client, configRootNode, dirAbsolutePath); } /** * 新增文件节点 * @param configRootNode * @param fileAbsolutePath 文件的绝对路径,不包括文件名 * @param fileName 文件名 * @param fileContent 文件内容 * @throws KeeperException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static boolean addConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){ logger.error("fileAbsolutePath,fileName,fileContent can't be empty"); return false; } return ZkClientNodeUtil.createFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent); } /** * 删除文件节点 * @param configRootNode * @param fileAbsolutePath 文件的绝对路径,不包括文件名 * @param fileName 文件名 * @throws InterruptedException * @throws KeeperException */ public static boolean deleteConfigFile(String configRootNode,String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){ logger.error("fileAbsolutePath,fileName can't be empty"); return false; } return ZkClientNodeUtil.deleteFileNode(client, configRootNode, fileAbsolutePath, fileName); } /** * 更新配置文件内容 * @param configRootNode * @param fileAbsolutePath * @param fileName * @param fileContent * @throws InterruptedException * @throws KeeperException * @throws UnsupportedEncodingException */ public static boolean updateConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){ logger.error("fileAbsolutePath,fileName,fileContent can't be empty"); return false; } return ZkClientNodeUtil.updateFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent); } }
至此,通过ZkClient重构的统一配置管理框架就完成了。
经过实际测试,zkclient可以完美解决上一篇博客中未解决的问题,这得益于zkclient大量正确的使用了retryUntilConnected方法。
相关文章推荐
- 基于ZooKeeper的统一配置管理实现(一)
- 基于zookeeper实现统一配置管理
- <三>基于Fourinone实现统一配置管理指南和demo
- 基于zookeeper实现统一资源管理
- <三>基于淘宝Fourinone实现统一配置管理指南和demo
- zookeeper和java实现的统一配置管理和集群节点管理简单案例
- Zookeeper应用 - 集中配置管理系统的实现
- Zookeeper应用 - 集中配置管理系统的实现
- 基于saltstack实现的配置集中化管理
- ZooKeeper 笔记(3) 实战应用之【统一配置管理】
- Spring整合quartz配置【一】基于MethodInvokingJobDetailFactoryBean实现任务管理
- 基于xml文件实现系统属性配置管理
- 使用zookeeper实现静态数据中心化配置管理
- [转载] zookeeper应用——集中配置管理系统的实现
- (未写完)Zookeeper学习(六):利用Zookeeper实现配置管理
- Linux管理工作,实例讲解工作中使用ssh证书登录的实际流程,讲解ssh证书登录的配置原理,基于配置原理,解决实际工作中,windows下使用SecureCRT证书登录的各种问题,以及实现hadoo
- 配置VTP域,实现VLAN的统一配置和管理
- 基于配置实现信息管理系统开发基础框架
- 基于saltstack实现的配置集中化管理
- Tachyon Cluster: 基于Zookeeper的Master High Availability(HA)高可用配置实现