您的位置:首页 > 其它

zookeeper -- 第八章 zk开源客户端 Curator介绍 (下)

2018-03-06 00:00 429 查看

1、读取数据

构建操作包装类(Builder): GetDataBuilder getData() ---- CuratorFramework

GetDataBuilder

storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服务器端获取的状态数据存储到stat对象

Byte[] forPath (String path) // 节点路劲

public void readNode(String path) throws Exception {
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(path);
System.out.println("读取节点" + path + "的数据:" + new String(data));
System.out.println(stat.toString());
}

2、更新数据

构建操作包装类 (Builder) SetDataBuilder setData() ---- CuratorFramework

SetDataBuilder

withVersion (int version) // 特定版本号

forPath (String path, byte[] data) // 节点路劲

forPath (String path) // 节点路劲

public void updateNode(String path, byte[] data, int version)
throws Exception {
client.setData().withVersion(version).forPath(path, data);
}

3、读取子节点

构建操作包装类(Builder):GetChildrenBuilder getChildren() --- CuratorFramework

GetChildrenBuilder

storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服务器端获取的状态数据存储到stat对象

Byte[] forPath(String path) // 节点路径

usingWatcher(org.apache.zookeeper.Watcher watcher) // 设置watcher ,类似zk本身api ,也只能使用一次

usingWatcher(CuratorWatcher watcher) // 设置watcher ,类似于zk本身的api,也只能使用一次

public void getChildren(String path) throws Exception {
List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator");
for (String pth : children) {
System.out.println("child=" + pth);
}
}

4、设置watcher

1、 NodeCache

监听数据节点的内容变更

监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听

2、PathChildrenCache

监听指定节点的子节点变化情况

包括:新增子节点 子节点数据变更和子节点删除

3、 NodeCache介绍

构造函数

NodeCache(CuratorFramework client, String path)

NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

client : 客户端实例

path : 数据节点路径

dataIsCompressed : 是否进行数据压缩

回调接口

public interface NodeCacheListener {
// 没有参数,怎么获取事件信息以及节点数据
void nodeChanged() throws Exception;
}

4、PathChildrenCache介绍

1、 构造函数

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) {
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) {
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) {
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
}

2、 回调接口

public interface PathChildrenCacheListener {
void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception;
}

3、 构造函数参数

client : 客户端实例

path : 数据节点路径

dataIsCompressed : 是否进行数据压缩

cacheData : 用于配置是否把节点内容缓存起来,如果配置true,那么客户端在接收到节点列表变更时,也

能够获取到节点的数据内容,如果为false则无法取到数据内容

threadFactory : 通过这两个参数构造专门的线程池来处理事件通知

ExecutorService

4、 监听接口

public static enum Type {
CHILD_ADDED,                                // 新增子节点(CHILD_ADDED)
CHILD_UPDATED,                           // 子节点数据变更(CHILD_UPDATED)
CHILD_REMOVED,                          // 子节点删除(CHILD_REMOVED)
CONNECTION_SUSPENDED,
CONNECTION_RECONNECTED,
CONNECTION_LOST,
INITIALIZED;

private Type() {
}
}

5、 PathChildrenCache.StartMode

public static enum StartMode {
// 异步初始化cache
NORMAL,

// 同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据
BUILD_INITIAL_CACHE,

// 异步初始化,初始化完成触发事件, PathChildrenCacheEvent.Type.INITIALIZED
POST_INITIALIZED_EVENT;

private StartMode() {
}
}

6、 代码演示

public void addChildWatcher(String path) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(this.client,
path, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
System.out.println(cache.getCurrentData().size());
//byte childone[] = cache.getCurrentData().get(0).getData();
//		System.out.println("childone:"
//				+ cache.getCurrentData().get(0).getPath() + ";data="
//				+ new String(childone));
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("客户端子节点cache初始化数据完成");
System.out.println("size="+cache.getCurrentData().size());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
System.out.println("添加子节点:"+event.getData().getPath());
System.out.println("修改子节点数据:"+new String(event.getData().getData()));
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:"+event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点数据:"+event.getData().getPath());
System.out.println("修改子节点数据:"+new String(event.getData().getData()));
}
}
});
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: