您的位置:首页 > 编程语言 > Java开发

Zookeeper(五)Java客户端节点操作

2017-02-07 20:25 411 查看
使用同步API创建一个节点

package book.chapter
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
//ZooKeeper API创建节点,使用同步(sync)接口
public class ZooKeeper_Constructor_Usage_With_SID_PASSWD implements Watcher {
private static CountDownLatch connectedSemphore = new CountDownLatch(1);

public static void main(String[] args) {
ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181",5000,//
new ZooKeeper_Create_API_Sync_Usage());
connectedSemphore.await();
String path1 = zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
System.out.println("Success create znode:"+path1);

String path2 = zookeeper.create("/zk-test-ephemeral-","".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode:"+path2);
}
public void process(WatchedEvent event){
if(KeeperState.SyncConnected == event.getState()){
connectedSemphore.countDown();
}
}
}
运行程序,输出结果如下:

Receive watched event:WatchedEvent state:SyncConnected type:None path:null

Success create znode:/zk-test-ephemeral-

Success create znode:/zk-test-ephemeral-0001975508

在上面这个程序片段中,使用了同步的节点创建接口:String create(final String path,byte data[],List<ACL> acl,CreateMode createMode)。在接口使用中,我们分别创建了两种类型的节点:临时节点和临时顺序节点。从返回的结果可以看出,如果创建了临时节点,那么API的返回值就是当时传入的path参数:如果创建了临时顺序节点,那么ZooKeeper会自动在节点后加上一个数字,并且在API接口的返回值中返回该数据节点的一个完整的节点路径。

使用异步API创建一个节点

package book.chapter
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

//ZooKeeper API创建节点,使用异步(async)接口
public class ZooKeeper_Create_API-ASync_Usage implements Watcher{
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) {
ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeepr:2181",5000,//
new ZooKeeper_Create_API_ASync_Usage());
connectedSemaphore.await();

zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL,
new IStringCallBack(),"I am contest.");
zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,new IStringCallback(),"I am context.");
zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,new ISstringCallback(),"I am context.");

Thread.sleep(Integer.MAX_VALUE);
}

public void process(WatchedEvent event){
if(KeeperState.SyncConnected == event.getState()){
connectedSemaphore.countDown();
}
}
}

class IStringCallback implements AsyncCallback.StringCallback{
public void proceResult(int rc,String path,Object ctx,String name){
System.out.println("Create path result:【"+rc+","+path+","+","
+ ctx+", real path name:"+ name);
}
}


运行程序,输出结果如下:

Create path result:[0,/zk-test-ephemeral-,I am context, real path name:/zk-test-ephemeral-

Create path result:[-110,/zk-test-ephemeral-,I am context.,real path name: null

Create path result:[0,/zk-test-ephemeral-,I am context., real path name:/zk-test-ephemeral-0001975736

从这个程序片段中可以看出,使用异步方式创建接口也很简单。用户仅仅需要实现AsyncCallback。StringCallback()接口即可。AsyncCallback包含了StatCallback、DataCalback、ACLCallback、ChildrenCallback、Children2Callback、StringCallback和VoidCallback七种不同的回调接口,用户可以在不同的异步接口中实现不同的接口。

和同步接口最大的区别在于,节点的创建过程(包括网络通信和服务端的节点创建过程)是异步的。并且,在同步接口调用过程中,我们需要关注接口抛出异常的可能;但是在异步接口中,接口本身是不会抛出异常的、所有的异常都会在回调函数中通过Result Code(响应吗)来实现。

下面来重点看下回调方法:void processResult(int rc,String path,Object ctx,String name)。这个方法的几个参数主要如表5-4所示。

表5-4

参数名说明
rcResult Code,服务端响应码。客户端可以从这个响应码中识别出API调用的结果,常见的响应码如下:
0(OK):接口调用成功
-4(ConnectionLoss):客户端和服务端连接已断开。
-110(NodeExists):指定节点已存在。
-112(SessionExpired):会话已过期

path接口调用时传入API的数据节点路径参数值
ctx接口调用时传入API的ctx参数值
name实际在服务端创建的节点名。在上述代码中,第三次创建节点时,由于创建的节点类型是顺序节点,因此在服务端没有真正创建好顺序节点之前,客户端无法知道节点的完整节点路径。于是,在回调方法中,服务端会返回这个数据节点的完整节点路径。

4.删除节点

客户端可以通过ZooKeeper的API来删除一个节点,有如下两个接口:

public void delete(final String path,int version)

pulbic void delete(final String path,int version,VoidCallback cb,Object ctx)

这里列出的两个API分别是同步和异步的删除接口,API方法的参数说明如表5-5所示。

表5-5

参数名说明
path指定数据节点的节点路径,即API调用的目的是删除该节点
version指定节点的数据版本,即表明本次删除操作时针对该数据版本进行的
cb注册一个异步回调函数
ctx用于传递上下文信息的对象
删除节点的接口和更新数据的接口是及其相似的,所以这里不再对示例代码做详细讲解,读者可以到book.chapter包下查看示例文件Delete_API_Sync_Usage.java。唯一需要指出的一点是,在ZooKeeper中,只允许删除叶子节点。也就是说,如果一个节点存在至少一个子节点的话,那么该节点将无法被直接删除,必须先删除掉其所有子节点。

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*4.获取子节点

getChildren有8个重载方法

zk.getChildren(path, watch);
zk.getChildren(path, watcher);
zk.getChildren(path, watch, stat);
zk.getChildren(path, watcher, stat);
zk.getChildren(path, watch, cb, ctx);
zk.getChildren(path, watcher, cb, ctx);

其中回调函数有两种 ChildrenCallback Children2Callback

参数说明:

path:节点路径

watch: boolean类型 如果为true 表示使用默认的Watcher 为false表示不需要Watcher

watcher: 通知处理器 在本次获取子节点以后 一旦子节点有变化机会收到服务端传来的通知

stat: 指定数据节点的节点状态信息,传入一个旧的stat对象,当执行方法后 stat会被服务器响应的新stat替换

cb:回调函数 有两种类型 上面已经说过

ctx: 上下文

5.获取节点数据

获取节点数据有4个重载方法

zk.getData(path, watch, stat);
zk.getData(path, watcher, stat);
zk.getData(path, watch, cb, ctx);
zk.getData(path, watcher, cb, ctx);

参数说明:

path: 节点路径

watch:boolean类型 如果为true 表示使用默认的Watcher 为false表示不需要Watcher

stat:指定数据节点的节点状态信息,传入一个旧的stat对象,当执行方法后 stat会被服务器响应的新stat替换

cb:回调函数 有两种类型 上面已经说过

ctx: 上下文

在获取节点数据时候 如果注册watcher 在节点数据发送变化的时候会通知客户端,当客户端收到通知以后,如果想下次数据发送变化再次收到通知,

需要重新注册watcher,获取子节点机制也如此

6.更新节点数据

更新节点数据也分为同步异步两个方法

zk.setData(path, data, version);
zk.setData(path, data, version, cb, ctx);

参数说明:同上 */

/**
* @author liuzhe
* @version v 0.1 2017/2/6 19:59
*/
public class ZookeeperSampleGetDataTest implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);

public static Stat stat = new Stat();
public static ZooKeeper zk = null;

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path = "/zk-test";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new ZookeeperSampleCreateTest());
connectedSemaphore.await();

//獲取子節點
/*zk.delete(path, 0);
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

//同步方法獲取children
List childs = zk.getChildren(path, true);
System.out.println("childs:"+childs);
//異步方法獲取children
zk.getChildren(path, true, new IChildren2Callback(), null);

zk.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);*/

//獲取節點數據
zk.delete(path, 0);
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);

System.out.println(new String(zk.getData(path, true, stat)));
//czxid:创建该节点的事务ID,mzxid:更新该节点的事务ID version:数据版本
System.out.println("Czxid:" + stat.getCzxid() + "Mzxid:" + stat.getMzxid()
+ "Version:" + stat.getVersion());

zk.setData(path, "123".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}

@Override
public void process(WatchedEvent event) {
System.out.println("fdfdfd");
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
try{
System.out.println("Get Child:"+zk.getChildren(event.getPath(),true));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println(new String(zk.getData(event.getPath(), true, stat)));
System.out.println("Czxid: " + stat.getCzxid() + "Mzxid: " + stat.getMzxid() + "Version: " + stat.getVersion());
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

class IChildren2Callback implements AsyncCallback.Children2Callback {

@Override
public void processResult(int rc, String path, Object ctx, List<String> childrens, Stat stat) {
System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx " + ctx
+ ", childrens :" + childrens + ", stat: " + stat);
}
}

/*其中更新节点数据 版本version问题 -1表示基于数据的最新版本更新 这里可以作为分布式锁的一个思路 如果客户端参入的version不是数据最新版本则会更新失败

比如目前节点"/zk-test"的数据版本为 2 而某个客户端尝试 执行 setData("/zk-test","test".getBytes(),1) 由于传入version为1 < 服务器目前版本2 这样就会更新失败

7.检测节点是否存在

zk.exists(path, watch);
zk.exists(path, watcher);
zk.exists(path, watch, cb, ctx);
zk.exists(path, watcher, cb, ctx);

如果判断节点是否存在是 注册watcher 会对节点是否存在进行监听--创建节点,删除节点,节点数据更新都会通知客户端

8.权限控制

zookeeper提供了ACL的权限控制机制,简单来说就是通过控制zookeeper服务器上数据节点的ACL,来控制客户端对节点的访问权限

addAuthInfo(String scheme,byte[] auth);

参数说明:

scheme: 权限控制模式 分为: world ,auth,digest,ip和super

auth: 具体的权限信息 类似于shiro的权限字符串

如下代码:

ZooKeeper zk1 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
zk1.addAuthInfo("digest", "test:true".getBytes());
zk1.create("/zk-test-auth", "123".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
ZooKeeper zk2 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
zk2.addAuthInfo("digest", "test:true".getBytes());
System.out.println(new String(zk2.getData("/zk-test-auth", false, null)));
ZooKeeper zk3 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
zk3.addAuthInfo("digest", "test:false".getBytes());
zk3.getData("/zk-test-auth", false, null);

zk2设置了正确的权限 所以可以获取到节点数据 zk3则会抛异常

org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-test-auth*/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: