您的位置:首页 > 其它

Zookeeper_实际应用讲解

2019-05-23 13:51 260 查看
[code]我告诉你zookeeper一般是做什么事的,这里会有一个管理工具,到底怎么去用呢,eclipse和工具去集成

[code]输入zookeeperBrower这个名字,输入http://www.massedynamic.org/eclipse/updates/网址,无非就是把这个插件装上,

Help里面有一个eclipsemarket,肯定要求输入名字和地址,安装这个插件,安装上了就可以用这个zookeeper explorer,

这个名字其实你可以随便起的,但是地址你不能随便弄,你如果想在eclipse上去观察的话你可以用这个,我也会发给你

第三方的工具,咱们看一下cluster,这是随便模拟的一个例子,他并不是做一个很真实的场景

[code]如果有一个实际的场景的话,我希望对分布式系统配置,进行集中地管理,我可能是集群,我有好多的服务,比如开始连的

是同样的数据库,都要连zookeeper,一下子变化了,zookeeper地址一下子变化了,怎么办,原先zookeeper在1.1上,

1.1,1.2,1.3,现在整体变成1.4,1.5,1.6了,3台无所谓,如果是30台,300台呢,你可能一个机器一个机器的去改太麻烦了,

这个时候就可以使用watcher来做这个事情,去观察,动态的去修改,这个后期再去讲实际的案例吧,我现在有一个非常简单的

DEMO,首先我有连个client端,client1和client2,你可以把这两个哥们当成两台机器,这两台机器在工作中怎么去用呢,

项目启动的时候,我都要watcher启动起来,然后我就一直在这里等着,就是把项目启动了,我就直接运行,第一台机器连上了

就是运行的,第二台机器也是这样的,首先每一个机器都是watcher,分别有两台机器去watcher了,是什么节点呢,你可以去看一下,

这两个client都同时实例化一个类,以后在工作中你可以把watcher写在一个common里面,也可以写在每一个项目中,这都是行的
[code]package com.learn.zookeeper.cluster;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
* 这是好久之前模拟的
* 代码写的也比较low
* 主要是让你看到这个效果
* 以后用到一些Curator框架的时候
* 那才是最好的
* 自己刚学会CopyOnWriteArrayList这种集合
* 像这种节点频繁变更的
* 不应该采用这种list
* 你宁可采用ArrayList
* 或者并发的List
* 所以你千万别使用这个
* 你就当看一个逻辑
* 而不是看代码怎么去实现的
* 我要你知道子节点变更的时候是有多麻烦
* 我要知道子节点到底发生了什么样的变化
* 原生的API写的时候是比较麻烦的
* 你看一下process就知道了
* process的时候我做了一堆事
* 我自己都不知道怎么去写的
* 以后我们就用Curator会是更好的方式
* 主要讲的是一个场景
* 我们在真正的工作中有好几台服务器
* 都要公用一份config文件
* 那么我就可以用zookeeper去做这个事情
*
*
*
* @author Leon.Sun
*
*/
public class ZKWatcher implements Watcher {

/** zk变量 */
private ZooKeeper zk = null;

/** 父节点path */
/**
* 有一个/super节点
* super节点它是key
* 默认的时候是先去get /super
* 然后把IP地址获得
* 获取是其他的服务器
* 咱们举个例子
* 一旦有super节点发生update的时候
* 谁watcher了我就给谁发update
* Node Change这个事件
* 直接把新的value给更新上了
* 更新上了就做一个集体的切换
* 很多时候工作就是做这个事
* 这是最简单的一种实现
*
*
*/
static final String PARENT_PATH = "/super";

/** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);

private List<String> cowaList = new CopyOnWriteArrayList<String>();

/** zookeeper服务器地址 */
//	public static final String CONNECTION_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181";
public static final String CONNECTION_ADDR = "59.110.138.145:2181";
/** 定义session失效时间 */
public static final int SESSION_TIMEOUT = 30000;

public ZKWatcher() throws Exception{
zk = new ZooKeeper(CONNECTION_ADDR, SESSION_TIMEOUT, this);
System.out.println("开始连接ZK服务器");
connectedSemaphore.await();
}

@Override
public void process(WatchedEvent event) {
// 连接状态
KeeperState keeperState = event.getState();
// 事件类型
EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
System.out.println("受影响的path : " + path);

if (KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (EventType.None == eventType) {
System.out.println("成功连接上ZK服务器");
connectedSemaphore.countDown();
try {
if(this.zk.exists(PARENT_PATH, false) == null){
this.zk.create(PARENT_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> paths = this.zk.getChildren(PARENT_PATH, true);
for (String p : paths) {
System.out.println(p);
this.zk.exists(PARENT_PATH + "/" + p, true);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//创建节点
else if (EventType.NodeCreated == eventType) {
System.out.println("节点创建");
try {
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//更新节点
else if (EventType.NodeDataChanged == eventType) {
System.out.println("节点数据更新");
try {
//update nodes  call function
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//更新子节点
else if (EventType.NodeChildrenChanged == eventType) {
System.out.println("子节点 ... 变更");
try {
List<String> paths = this.zk.getChildren(path, true);
if(paths.size() >= cowaList.size()){
paths.removeAll(cowaList);
for(String p : paths){
this.zk.exists(path + "/" + p, true);
//this.zk.getChildren(path + "/" + p, true);
System.out.println("这个是新增的子节点 : " + path + "/" + p);
//add new nodes  call function
}
cowaList.addAll(paths);
} else {
cowaList = paths;
}
System.out.println("cowaList: " + cowaList.toString());
System.out.println("paths: " + paths.toString());

} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//删除节点
else if (EventType.NodeDeleted == eventType) {
System.out.println("节点 " + path + " 被删除");
try {
//delete nodes  call function
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
else ;
}
else if (KeeperState.Disconnected == keeperState) {
System.out.println("与ZK服务器断开连接");
}
else if (KeeperState.AuthFailed == keeperState) {
System.out.println("权限检查失败");
}
else if (KeeperState.Expired == keeperState) {
System.out.println("会话失效");
}
else ;

System.out.println("--------------------------------------------");
}

}
[code]package com.learn.zookeeper.cluster;

public class Client1 {

public static void main(String[] args) throws Exception{

ZKWatcher myWatcher = new ZKWatcher();
Thread.sleep(100000000);
}
}
[code]package com.learn.zookeeper.cluster;

public class Client2 {

public static void main(String[] args) throws Exception{

ZKWatcher myWatcher = new ZKWatcher();
Thread.sleep(100000000);
}
}
[code]package com.learn.zookeeper.cluster;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
* 机器1和机器2是一个集群的方式
* 以后要连同样的一份代码
* 要连redis集群
* 你再开发一套应用
* 这个应用就是用来管理zookeeper上的节点的
* 我这块添加了一个节点了
* 这两台机器应该马上感知到
* 我再添加删除
* 然后这里面的业务逻辑变更
* 其实就是这么一个道理
* 我把test运行一下吧

7ff7
* 运行完了之后Test就停止了
*
*
* @author Leon.Sun
*
*/
public class Test {

/** zookeeper地址 */
//	static final String CONNECT_ADDR = "192.168.1.106:2181,192.168.1.107:2181,192.168.1.108:2181";
/**
* 这个Test无论做什么事
* 这两个client都能感觉到的
* 比如我这个Test做什么事
*
*
*/
static final String CONNECT_ADDR = "59.110.138.145:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 2000;//ms
/** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

public static void main(String[] args) throws Exception{
/**
* new了一个zookeeper
* 这个Watcher就做了一个很简单的操作
* 连接上了就OK了
*
*/
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
@Override
public void process(WatchedEvent event) {
//获取事件的状态
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
//如果是建立连接
if(KeeperState.SyncConnected == keeperState){
if(EventType.None == eventType){
//如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
connectedSemaphore.countDown();
System.out.println("zk 建立连接");
}
}
}
});

//进行阻塞
connectedSemaphore.await();

//		//创建子节点
/**
* 创建了4个节点
* c1,c2,c3,c4
*/
zk.create("/super/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//创建子节点
zk.create("/super/c2", "c2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//创建子节点
zk.create("/super/c3", "c3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//创建子节点
zk.create("/super/c4", "c4".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

//		zk.create("/super/c4/c44", "c44".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

//获取节点信息
//		byte[] data = zk.getData("/testRoot", false, null);
//		System.out.println(new String(data));
//		System.out.println(zk.getChildren("/testRoot", false));

//修改节点的值
/**
* 修改了c1和c2
*/
zk.setData("/super/c1", "modify c1".getBytes(), -1);
zk.setData("/super/c2", "modify c2".getBytes(), -1);
byte[] data = zk.getData("/super/c2", false, null);
System.out.println(new String(data));

//		//判断节点是否存在
//		System.out.println(zk.exists("/super/c3", false));
//		//删除节点
/**
* 删除了c3
* 其实就是这么简单的操作
* 删除c3了
* 就只剩下c1,c2,c4了
* 我做完这个操作之后
* client1和client2都应该能感知到才行
* 都是触发了这些东西
* client1是这样
* client2也是这样
* 没有任何变化
* 这两个哥们C1和c2收到的信息是一样的
* 可以这么去理解
* 这两个都监听到了
* 就是这个意思
* 理解这个事就行了
* 很多块时候都需要做这个事情
* 降低重复工作
*
*
*
*/
zk.delete("/super/c3", -1);
zk.close();

}
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: