您的位置:首页 > 其它

zookeeper学习笔记-zkclient,curator使用

2017-09-27 00:00 239 查看
开源客户端,原生api的不足

连接的创建是异步的,需要开发人员自行编码实现等待

连接没有自动的超时重连机制

Zk本身没提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化

Watcher注册一次只会生效一次,需要不断的重复注册

Watcher的使用方式不符合java本身的术语,如果采用监听器方式,更容易理解

不支持递归创建树形节点

开源客户端---ZkClient介绍

Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Voss一起开发

– 解决session会话超时重连

– Watcher反复注册

– 简化开发api

– 还有.....

https://github.com/sgroschupf/zkclient
开源客户端---Curator介绍

1. 使用CuratorFrameworkFactory工厂的两个静态方法创建客户端

a) static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,

RetryPolicy retryPolicy)

b) static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

2. Start()方法启动

参数说明

connectString 分开的ip:port对

retryPolicy 重试策略,默认四种:Exponential BackoffRetry,RetryNTimes ,RetryOneTime,

RetryUntilElapsed

sessionTimeoutMs 会话超时时间,单位为毫秒,默认60000ms

connectionTimeoutMs 连接创建超时时间,单位为毫秒,默认是15000ms

重试策略

– 实现接口RetryPolicy可以自定重重试策略

• boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)

retryCount 已经重试的次数,如果第一次重试 此值为0

elapsedTimeMs 重试花费的时间,单位为毫秒

sleeper 类似于Thread.sleep,用于sleep指定时间

返回值 如果还会继续重试,则返回true

四种默认重试策略

– ExponentialBackoffRetry

• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

• 当前应该sleep的时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),随着重试次数增加重试时间间隔变大,指数倍增长

参数说明

baseSleepTimeMs 初始sleep时间

maxRetries 最大重试次数

maxSleepMs 最大重试时间

返回值 如果还会继续重试,则返回true

默认重试策略

– RetryNTimes

• RetryNTimes(int n, int sleepMsBetweenRetries)

• 当前应该sleep的时间

参数说明

n 最大重试次数

sleepMsBetweenRetries 每次重试的间隔时间

– RetryOneTime

• 只重试一次

• RetryOneTime(int sleepMsBetweenRetry), sleepMsBetweenRetry为重试间隔的时间

默认重试策略

– RetryUntilElapsed

• RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

• 重试的时间超过最大时间后,就不再重试

参数说明

maxElapsedTimeMs 最大重试时间

sleepMsBetweenRetries 每次重试的间隔时间

Fluent风格的API

– 定义:一种面向对象的开发方式,目的是提高代码的可读性

– 实现方式:通过方法的级联或者方法链的方式实现

– 举例:

zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();

创建节点

– 构建操作包装类(Builder): CreateBuilder create()---- CuratorFramework

– CreateBuilder

• creatingParentsIfNeeded() //递归创建父目录

• withMode(CreateMode mode)//设置节点属性,比如:CreateMode.PERSISTENT,如果是递归创建,创建模式

为临时节点,则只有叶子节点是临时节点,非叶子节点都为持久节点

• withACL(List aclList) //设置acl

• forPath(String path) //指定路劲

删除节点

– 构建操作包装类(Builder):DeleteBuilder delete() -----CuratorFramework

– DeleteBuilder

• withVersion(int version) //特定版本号

• guaranteed() //确保节点被删除

• forPath(String path) //指定路径

• deletingChildrenIfNeeded() //递归删除所有子节点

关于guaranteed:

Solves edge cases where an operation may succeed on the server but connection failure

occurs before a response can be successfully returned to the client

意思是:解决当某个删除操作在服务器端可能成功,但是此时客户端与服务器端的连接中断,而删除的响

应没有成功返回到客户端

底层的本质是重试

关于异步操作

– inBackground()

– inBackground(Object context)

– inBackground(BackgroundCallback callback)

– inBackground(BackgroundCallback callback, Object context)

– inBackground(BackgroundCallback callback, Executor executor)

– inBackground(BackgroundCallback callback, Object context, Executor executor)

从参数看跟zk的原生异步api相同,多了一个线程池,用于执行回调

读取数据

– 构建操作包装类(Builder): GetDataBuilder getData() -----CuratorFramework

– GetDataBuilder

• storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象

• Byte[] forPath(String path)//节点路径

读取子节点

– 构建操作包装类(Builder): GetChildrenBuilder getChildren() -----CuratorFramework

– GetChildrenBuilder

• storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象

• Byte[] forPath(String path)//节点路径

• usingWatcher(org.apache.zookeeper.Watcher watcher) //设置watcher,类似于zk本身的api,也只能使用一次

• usingWatcher(CuratorWatcher watcher) //设置watcher ,类似于zk本身的api,也只能使用一次

设置watcher

– NodeCache

• 监听数据节点的内容变更

• 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听

– PathChildrenCache

• 监听指定节点的子节点变化情况

• 包括:新增子节点 子节点数据变更 和子节点删除

NodeCache

– 构造函数

• NodeCache(CuratorFramework client, String path)

• NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

参数说明

client 客户端实例

path 数据节点路径

dataIsCompressed 是否进行数据压缩

– 回调接口

• public interface NodeCacheListener

void nodeChanged() //没有参数,怎么获取事件信息以及节点数据?

PathChildrenCache

client 客户端实例

path 数据节点路径

dataIsCompressed 是否进行数据压缩

cacheData 用于配置是否把节点内容缓存起来,如果配置为true,那么客户端在接

收到节点列表变更的同时,也能够获取到节点的数据内容,如果为false

则无法取到数据内容

threadFactory 通过这两个参数构造专门的线程池来处理事件通知

executorService

PathChildrenCache

– 监听接口

• 时间类型包括:新增子节点(CHILD_ADDED),子节点数据变更(CHILD_UPDATED),子节点删除(CHILD_REMOVED)

– PathChildrenCache.StartMode

• BUILD_INITIAL_CACHE //同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据

• NORMAL //异步初始化cache

• POST_INITIALIZED_EVENT //异步初始化,初始化完成触发事件PathChildrenCacheEvent.Type.INITIALIZED

zkclient举例

package com.zk.dev.zkClient.day1;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class ZKTest  {

private ZkClient zk;

private String nodeName = "/test";

@Before
public void initTest() {
zk = new ZkClient("localhost:2181");
}

@After
public void dispose() {
zk.close();
}

@Test
public void testListener() throws InterruptedException {
// 监听指定节点的数据变化

zk.subscribeDataChanges(nodeName, new IZkDataListener() {
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("node data changed!");
System.out.println("node=>" + s);
System.out.println("data=>" + o);
System.out.println("--------------");
}

public void handleDataDeleted(String s) throws Exception {
System.out.println("node data deleted!");
System.out.println("s=>" + s);
System.out.println("--------------");

}
});

System.out.println("ready!");

// junit测试时,防止线程退出
while (true) {
TimeUnit.SECONDS.sleep(5);
}
}

@Test
public void testUpdateConfig() throws InterruptedException {
if (!zk.exists(nodeName)) {
zk.createPersistent(nodeName);
}
zk.writeData(nodeName, "1");
zk.writeData(nodeName, "2");
zk.delete(nodeName);
zk.delete(nodeName);
zk.writeData("/test/ba", "bbb");
}
}


curator举例

package com.zk.dev.zkClient.day1;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;

/**
* @see 测试curator框架例子
* @Author:xuehan
* @Date:2016年5月14日下午8:44:49
*/
public class CuratorUtils {

public String connectString = "localhost:2181";
CuratorFramework  zkclient = null ;
public CuratorUtils(){
/**
* connectString连接字符串中间用分号隔开,sessionTimeoutMs session过期时间,connectionTimeoutMs连接超时时间,retryPolicyc连接重试策略
*/
//CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy)
// fluent风格aip
//	CuratorFrameworkFactory.builder().sessionTimeoutMs(5000).connectString(connectString).namespace("/test").build();
// 重连策略,没1一秒重试一次,最大重试次数3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();
zkclient.start();
}
/**
* 递归创建节点
* @param path
* @param data
* @throws Exception
*/
public void createNode(String path, byte[] data) throws Exception{
zkclient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, data);
}
/**
* 递归删除节点
* @param path
* @throws Exception
*/
public void delNode(String path) throws Exception{
zkclient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
}	public void zkClose(){
zkclient.close();
}
public void delNodeCallBack(String path) throws Exception{
zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);
}
public void dataChanges(String path) throws Exception{
final NodeCache  dataWatch =  new NodeCache(zkclient, path);
dataWatch.start(true);
dataWatch.getListenable().addListener(new NodeCacheListener(){

public void nodeChanged() throws Exception {
System.out.println("path==>" + dataWatch.getCurrentData().getPath() + "==data==>" + new String(dataWatch.getCurrentData().getData()));
}

});
zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);
}
public void addChildWatcher(String path) throws Exception{
final PathChildrenCache pc = new PathChildrenCache(zkclient, path, true);
pc.start(StartMode.POST_INITIALIZED_EVENT);
System.out.println("节点个数===>" + pc.getCurrentData().size());
pc.getListenable().addListener(new  PathChildrenCacheListener() {

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("事件监听到"  + event.getData().getPath());
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("客户端初始化节点完成"  + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
System.out.println("添加节点完成"  + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除节点完成"  + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改节点完成"  + event.getData().getPath());
}
}
});

}

public static void main(String[] args) throws Exception{
CuratorUtils cu = new CuratorUtils();
//		cu.createNode("/test/sb/aa/bb", "erhu".getBytes());
//		cu.delNode("/test");

cu.zkclient.setData().forPath("/aa", "love is not".getBytes());
cu.addChildWatcher("/aa");
try{
Thread.sleep(20000000);
}catch(Exception e){};
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper