您的位置:首页 > 编程语言 > Java开发

Zookeeper实例原生API--同步/异步创建节点

2018-01-10 00:00 190 查看
摘要: Zookeeper实例原生API--同步创建节点

[2018-01-10 14:32:26.428  INFO org.apache.zookeeper.ZooKeeper:438] [main] Initiating client connection, connectString=centos4:2181 sessionTimeout=5000 watcher=com.timer.zkclient.ZooKeeper_Create_API_Sync_Usage@1ae369b7
[2018-01-10 14:32:26.568  INFO org.apache.zookeeper.ClientCnxn:1032] [main-SendThread(centos4:2181)] Opening socket connection to server centos4/192.168.20.244:2181. Will not attempt to authenticate using SASL (unknown error)
[2018-01-10 14:32:26.572  INFO org.apache.zookeeper.ClientCnxn:876] [main-SendThread(centos4:2181)] Socket connection established to centos4/192.168.20.244:2181, initiating session
[2018-01-10 14:32:26.597  INFO org.apache.zookeeper.ClientCnxn:1299] [main-SendThread(centos4:2181)] Session establishment complete on server centos4/192.168.20.244:2181, sessionid = 0x160db22da860039, negotiated timeout = 5000
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:	SyncConnected
【Watcher-1】事件类型:	None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/zk-test-ephemeral
Success create znode: /zk-test-ephemeral
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态:	SyncConnected
【Watcher-2】事件类型:	NodeCreated
【Watcher-2】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/zk-test-ephemeral/dev
Success create znode: /zk-test-ephemeral/dev
【Watcher-3】收到Watcher通知
【Watcher-3】连接状态:	SyncConnected
【Watcher-3】事件类型:	NodeCreated
【Watcher-3】节点创建
--------------------------------------------


.同步创建节点

create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl, org.apache.zookeeper.CreateMode createMode)

path:创建节点路径,需保证父节点已存在

data:节点数据

acl:权限列表

提供默认的权限OPEN_ACL_UNSAFE、CREATOR_ALL_ACL、READ_ACL_UNSAFE

OPEN_ACL_UNSAFE:完全开放

CREATOR_ALL_ACL:创建该znode的连接拥有所有权限

READ_ACL_UNSAFE:所有的客户端都可读

自定义权限  
ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1"));
ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE,
new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass")));


session设置权限
zk.addAuthInfo("digest", "id:pass".getBytes());  


createMode:节点类型

PERSISTENT:持久化节点

PERSISTENT_SEQUENTIAL:持久化有序节点

EPHEMERAL:临时节点(连接断开自动删除)

EPHEMERAL_SEQUENTIAL:临时有序节点(连接断开自动删除)

2.异步创建节点

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created with IntelliJ IDEA.
* Description:  TODO(ZooKeeper API创建节点,使用异步(async)接口。)
* User: zhubo
* Date: 2018-01-10
* Time: 14:53
*/
public class ZooKeeper_Create_API_ASync_Usage implements Watcher {

/** 定义原子变量 */
AtomicInteger seq = new AtomicInteger();

private static CountDownLatch countDownLatch = new CountDownLatch(1);

public static void main(String[] args) throws Exception{
ZooKeeper zookeeper = new ZooKeeper("centos4:2181",5000,new ZooKeeper_Create_API_ASync_Usage());
countDownLatch.await();
zookeeper.create("/zk-test-ephemeral-","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"I am context.");
zookeeper.create("/zk-test-ephemeral-","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"I am context.");
zookeeper.create("/zk-test-ephemeral-","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"I am context.");

Thread.sleep(1000L);
}

@Override
public void process(WatchedEvent event) {
System.out.println("进入 process 。。。。。event = " + event);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (event == null) {
return;
}
// 连接状态
Watcher.Event.KeeperState keeperState = event.getState();
// 事件类型
Watcher.Event.EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();

String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (Watcher.Event.EventType.None == eventType) {
System.out.println(logPrefix + "成功连接上ZK服务器");
countDownLatch.countDown();
}
//创建节点
else if (Watcher.Event.EventType.NodeCreated == eventType) {
System.out.println(logPrefix + "节点创建");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//更新节点
else if (Watcher.Event.EventType.NodeDataChanged == eventType) {
System.out.println(logPrefix + "节点数据更新");
System.out.println("我看看走不走这里........");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//更新子节点
else if (Watcher.Event.EventType.NodeChildrenChanged == eventType) {
System.out.println(logPrefix + "子节点变更");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//删除节点
else if (Watcher.Event.EventType.NodeDeleted == eventType) {
System.out.println(logPrefix + "节点 " + path + " 被删除");
}
else{}
}
else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
System.out.println(logPrefix + "与ZK服务器断开连接");
}
else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
System.out.println(logPrefix + "权限检查失败");
}
else if (Watcher.Event.KeeperState.Expired == keeperState) {
System.out.println(logPrefix + "会话失效");
}
else {}
System.out.println("--------------------------------------------");
}

}

class IStringCallBack implements AsyncCallback.StringCallback{
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
}
}

[2018-01-10 15:21:15.002  INFO org.apache.zookeeper.ZooKeeper:438] [main] Initiating client connection, connectString=centos4:2181 sessionTimeout=5000 watcher=com.timer.zkclient.ZooKeeper_Create_API_ASync_Usage@34340fab
[2018-01-10 15:21:15.109  INFO org.apache.zookeeper.ClientCnxn:1032] [main-SendThread(centos4:2181)] Opening socket connection to server centos4/192.168.20.244:2181. Will not attempt to authenticate using SASL (unknown error)
[2018-01-10 15:21:15.112  INFO org.apache.zookeeper.ClientCnxn:876] [main-SendThread(centos4:2181)] Socket connection established to centos4/192.168.20.244:2181, initiating session
[2018-01-10 15:21:15.130  INFO org.apache.zookeeper.ClientCnxn:1299] [main-SendThread(centos4:2181)] Session establishment complete on server centos4/192.168.20.244:2181, sessionid = 0x160db22da860040, negotiated timeout = 5000
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:	SyncConnected
【Watcher-1】事件类型:	None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
Create path result: [0, /zk-test-ephemeral-, I am context., real path name: /zk-test-ephemeral-
Create path result: [-110, /zk-test-ephemeral-, I am context., real path name: null
Create path result: [0, /zk-test-ephemeral-, I am context., real path name: /zk-test-ephemeral-0000000035

注意:

AsyncCallback包含了StatCallback、DataCallback、ACLCallback、ChildrenCallback、Children2Callback、StringCallback和VoidCallback 七种不同回调接口。和同步接口最大的区别就是:节点创建过程(包括网络通信和服务端的节点创建过程)是异步的,并且在同步接口调用过程中,我们需要关注接口抛出的异常,但是在异步接口中,接口本身不会抛出异常的,所有的异常都会在回调函数中通过Result Code 来体现。

扩展

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper java