Zookeeper_实际应用讲解
2019-05-23 13:51
260 查看
[code]我告诉你zookeeper一般是做什么事的,这里会有一个管理工具,到底怎么去用呢,eclipse和工具去集成
[code]输入zookeeperBrower这个名字,输入http://www.massedynamic.org/eclipse/updates/网址,无非就是把这个插件装上, Help里面有一个eclipsemarket,肯定要求输入名字和地址,安装这个插件,安装上了就可以用这个zookeeper explorer, 这个名字其实你可以随便起的,但是地址你不能随便弄,你如果想在eclipse上去观察的话你可以用这个,我也会发给你 第三方的工具,咱们看一下cluster,这是随便模拟的一个例子,他并不是做一个很真实的场景
[code]如果有一个实际的场景的话,我希望对分布式系统配置,进行集中地管理,我可能是集群,我有好多的服务,比如开始连的 是同样的数据库,都要连zookeeper,一下子变化了,zookeeper地址一下子变化了,怎么办,原先zookeeper在1.1上, 1.1,1.2,1.3,现在整体变成1.4,1.5,1.6了,3台无所谓,如果是30台,300台呢,你可能一个机器一个机器的去改太麻烦了, 这个时候就可以使用watcher来做这个事情,去观察,动态的去修改,这个后期再去讲实际的案例吧,我现在有一个非常简单的 DEMO,首先我有连个client端,client1和client2,你可以把这两个哥们当成两台机器,这两台机器在工作中怎么去用呢, 项目启动的时候,我都要watcher启动起来,然后我就一直在这里等着,就是把项目启动了,我就直接运行,第一台机器连上了 就是运行的,第二台机器也是这样的,首先每一个机器都是watcher,分别有两台机器去watcher了,是什么节点呢,你可以去看一下, 这两个client都同时实例化一个类,以后在工作中你可以把watcher写在一个common里面,也可以写在每一个项目中,这都是行的
[code]package com.learn.zookeeper.cluster; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * 这是好久之前模拟的 * 代码写的也比较low * 主要是让你看到这个效果 * 以后用到一些Curator框架的时候 * 那才是最好的 * 自己刚学会CopyOnWriteArrayList这种集合 * 像这种节点频繁变更的 * 不应该采用这种list * 你宁可采用ArrayList * 或者并发的List * 所以你千万别使用这个 * 你就当看一个逻辑 * 而不是看代码怎么去实现的 * 我要你知道子节点变更的时候是有多麻烦 * 我要知道子节点到底发生了什么样的变化 * 原生的API写的时候是比较麻烦的 * 你看一下process就知道了 * process的时候我做了一堆事 * 我自己都不知道怎么去写的 * 以后我们就用Curator会是更好的方式 * 主要讲的是一个场景 * 我们在真正的工作中有好几台服务器 * 都要公用一份config文件 * 那么我就可以用zookeeper去做这个事情 * * * * @author Leon.Sun * */ public class ZKWatcher implements Watcher { /** zk变量 */ private ZooKeeper zk = null; /** 父节点path */ /** * 有一个/super节点 * super节点它是key * 默认的时候是先去get /super * 然后把IP地址获得 * 获取是其他的服务器 * 咱们举个例子 * 一旦有super节点发生update的时候 * 谁watcher了我就给谁发update * Node Change这个事件 * 直接把新的value给更新上了 * 更新上了就做一个集体的切换 * 很多时候工作就是做这个事 * 这是最简单的一种实现 * * */ static final String PARENT_PATH = "/super"; /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); private List<String> cowaList = new CopyOnWriteArrayList<String>(); /** zookeeper服务器地址 */ // public static final String CONNECTION_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181"; public static final String CONNECTION_ADDR = "59.110.138.145:2181"; /** 定义session失效时间 */ public static final int SESSION_TIMEOUT = 30000; public ZKWatcher() throws Exception{ zk = new ZooKeeper(CONNECTION_ADDR, SESSION_TIMEOUT, this); System.out.println("开始连接ZK服务器"); connectedSemaphore.await(); } @Override public void process(WatchedEvent event) { // 连接状态 KeeperState keeperState = event.getState(); // 事件类型 EventType eventType = event.getType(); // 受影响的path String path = event.getPath(); System.out.println("受影响的path : " + path); if (KeeperState.SyncConnected == keeperState) { // 成功连接上ZK服务器 if (EventType.None == eventType) { System.out.println("成功连接上ZK服务器"); connectedSemaphore.countDown(); try { if(this.zk.exists(PARENT_PATH, false) == null){ this.zk.create(PARENT_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } List<String> paths = this.zk.getChildren(PARENT_PATH, true); for (String p : paths) { System.out.println(p); this.zk.exists(PARENT_PATH + "/" + p, true); } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } //创建节点 else if (EventType.NodeCreated == eventType) { System.out.println("节点创建"); try { this.zk.exists(path, true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } //更新节点 else if (EventType.NodeDataChanged == eventType) { System.out.println("节点数据更新"); try { //update nodes call function this.zk.exists(path, true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } //更新子节点 else if (EventType.NodeChildrenChanged == eventType) { System.out.println("子节点 ... 变更"); try { List<String> paths = this.zk.getChildren(path, true); if(paths.size() >= cowaList.size()){ paths.removeAll(cowaList); for(String p : paths){ this.zk.exists(path + "/" + p, true); //this.zk.getChildren(path + "/" + p, true); System.out.println("这个是新增的子节点 : " + path + "/" + p); //add new nodes call function } cowaList.addAll(paths); } else { cowaList = paths; } System.out.println("cowaList: " + cowaList.toString()); System.out.println("paths: " + paths.toString()); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } //删除节点 else if (EventType.NodeDeleted == eventType) { System.out.println("节点 " + path + " 被删除"); try { //delete nodes call function this.zk.exists(path, true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } else ; } else if (KeeperState.Disconnected == keeperState) { System.out.println("与ZK服务器断开连接"); } else if (KeeperState.AuthFailed == keeperState) { System.out.println("权限检查失败"); } else if (KeeperState.Expired == keeperState) { System.out.println("会话失效"); } else ; System.out.println("--------------------------------------------"); } }
[code]package com.learn.zookeeper.cluster; public class Client1 { public static void main(String[] args) throws Exception{ ZKWatcher myWatcher = new ZKWatcher(); Thread.sleep(100000000); } }
[code]package com.learn.zookeeper.cluster; public class Client2 { public static void main(String[] args) throws Exception{ ZKWatcher myWatcher = new ZKWatcher(); Thread.sleep(100000000); } }
[code]package com.learn.zookeeper.cluster; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * 机器1和机器2是一个集群的方式 * 以后要连同样的一份代码 * 要连redis集群 * 你再开发一套应用 * 这个应用就是用来管理zookeeper上的节点的 * 我这块添加了一个节点了 * 这两台机器应该马上感知到 * 我再添加删除 * 然后这里面的业务逻辑变更 * 其实就是这么一个道理 * 我把test运行一下吧 7ff7 * 运行完了之后Test就停止了 * * * @author Leon.Sun * */ public class Test { /** zookeeper地址 */ // static final String CONNECT_ADDR = "192.168.1.106:2181,192.168.1.107:2181,192.168.1.108:2181"; /** * 这个Test无论做什么事 * 这两个client都能感觉到的 * 比如我这个Test做什么事 * * */ static final String CONNECT_ADDR = "59.110.138.145:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 2000;//ms /** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */ static final CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception{ /** * new了一个zookeeper * 这个Watcher就做了一个很简单的操作 * 连接上了就OK了 * */ ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){ @Override public void process(WatchedEvent event) { //获取事件的状态 KeeperState keeperState = event.getState(); EventType eventType = event.getType(); //如果是建立连接 if(KeeperState.SyncConnected == keeperState){ if(EventType.None == eventType){ //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行 connectedSemaphore.countDown(); System.out.println("zk 建立连接"); } } } }); //进行阻塞 connectedSemaphore.await(); // //创建子节点 /** * 创建了4个节点 * c1,c2,c3,c4 */ zk.create("/super/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //创建子节点 zk.create("/super/c2", "c2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //创建子节点 zk.create("/super/c3", "c3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //创建子节点 zk.create("/super/c4", "c4".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // zk.create("/super/c4/c44", "c44".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //获取节点信息 // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); // System.out.println(zk.getChildren("/testRoot", false)); //修改节点的值 /** * 修改了c1和c2 */ zk.setData("/super/c1", "modify c1".getBytes(), -1); zk.setData("/super/c2", "modify c2".getBytes(), -1); byte[] data = zk.getData("/super/c2", false, null); System.out.println(new String(data)); // //判断节点是否存在 // System.out.println(zk.exists("/super/c3", false)); // //删除节点 /** * 删除了c3 * 其实就是这么简单的操作 * 删除c3了 * 就只剩下c1,c2,c4了 * 我做完这个操作之后 * client1和client2都应该能感知到才行 * 都是触发了这些东西 * client1是这样 * client2也是这样 * 没有任何变化 * 这两个哥们C1和c2收到的信息是一样的 * 可以这么去理解 * 这两个都监听到了 * 就是这个意思 * 理解这个事就行了 * 很多块时候都需要做这个事情 * 降低重复工作 * * * */ zk.delete("/super/c3", -1); zk.close(); } }
相关文章推荐
- 以公司实际应用讲解OpenStack到底是什么
- Java注解的实际应用案例讲解
- 记录ThreadPoolTaskExecutor线程池的在项目中的实际应用,讲解一下线程池的配置和参数理解。
- 以公司实际应用讲解OpenStack到底是什么(入门篇)
- zookeeper 在项目中的实际应用
- DWR ajax 实际应用讲解
- M3U8格式讲解及实际应用分析
- Dubbo实际应用基本配置讲解
- Zookeeper Watcher核心机制·安全认证(ACL)·实际应用
- Dubbo框架应用之(三)--Zookeeper注册中心、管理控制台的安装及讲解
- 从零开始,讲解详细,贴近实际应用,全面掌握用友ERP财务管理
- M3U8格式讲解及实际应用分析
- Dubbo框架应用之(三)--Zookeeper注册中心、管理控制台的安装及讲解
- 从零开始,讲解详细,贴近实际应用,全面掌握用友ERP财务管理
- M3U8格式讲解及实际应用分析
- 从零开始,讲解详细,贴近实际应用,全面掌握用友ERP财务管理
- 【Python】python 反射机制在实际的应用场景讲解
- Android RxJava实际应用案例讲解:使用RxJava的最佳开发场景
- Dubbo框架应用之(三)--Zookeeper注册中心、管理控制台的安装及讲解
- DWR ajax 实际应用讲解