您的位置:首页 > 其它

zookeeper - curator的监听事件

2017-06-26 00:00 393 查看
curator-recipes模块提供了zookeeper的一些典型应用场景的使用参考。在curator中,事件监听的支持由curator-recipes模块提供。下面将对这些监听器进行介绍。

监听客户端连接状态

Curator客户端与zookeeper连接的过程其实是一个异步过程,Curator为我们提供了一个监听器监听连接的状态,根据连接的状态做相应的处理。

private CountDownLatch countDownLatch = new CountDownLatch(1);

@Test
public void testClientConnStateListener() throws InterruptedException {
int retryIntervalMs = 1000;
RetryPolicy retryPolicy = new RetryForever(retryIntervalMs);
CuratorFramework testConnStateListenerClient = CuratorFrameworkFactory.builder()
.connectString(ZookeeperHelper.zkAddress)
.sessionTimeoutMs(ZookeeperHelper.sessionTimeout)
.retryPolicy(retryPolicy)
.build();

//添加监听器
testConnStateListenerClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.CONNECTED) {
try {
System.out.println("connected established");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown(); //释放锁
} else {
System.out.println("connection state : " + newState.name());
}
}
});
testConnStateListenerClient.start();
//加锁,暂不往下执行
countDownLatch.await();

testConnStateListenerClient.close();
}

NodeCache

可以监到当前节点数据的变化

@Test
public void testNodeDataListener() throws Exception {
String node_to_listen = "/listened_node";

client.create()
.creatingParentContainersIfNeeded() //自动递归创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath(node_to_listen);

NodeCache nodeCache = new NodeCache(client, node_to_listen, false);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data is changed, new data: " +
new String(nodeCache.getCurrentData().getData()));
}
});
nodeCache.start();

Thread.sleep(1000);

client.setData()
.forPath(node_to_listen, "new data".getBytes());//更新节点的数据

Thread.sleep(1000);

nodeCache.close();

client.delete().deletingChildrenIfNeeded().forPath(node_to_listen);
}

PathChildrenCache

(1)可以监听到当前节点下的孩子节点的变化,但是孩子节点下面的孩子节点的事情不能监听。
(2)可以监听到的事件:节点创建、节点数据的变化、节点删除等

@Test
public void testChildrenNodeListener() throws Exception {
String parent_node = "/parent_node";
String child_node = parent_node + "/child";

PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parent_node, false);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED: //子节点被添加
System.out.println("CHILD_ADDED: " + event.getData().getPath());
break;
case CHILD_REMOVED: //子节点被删除
System.out.println("CHILD_REMOVED: " + event.getData().getPath());
break;
case CHILD_UPDATED: //子节点数据变化
System.out.println("CHILD_UPDATED: " + event.getData().getPath());
break;
default:
break;
}
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();

client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(child_node);

Thread.sleep(1000);

client.setData()
.forPath(child_node, "new data".getBytes());//更新节点的数据

Thread.sleep(1000);

pathChildrenCache.close();

client.delete().deletingChildrenIfNeeded().forPath(parent_node);
}

TreeCache

(1)可以监听到指定节点下所有节点的变化。比如当前节点是”/node”,添加了TreeCacheListener后,不仅可以监听节点 "/node/child" 节点的变化,还能监听孙子节点 "/node/child/grandson"的变化。

(2)可以监听到的事件:节点创建、节点数据的变化、节点删除等

@Test
public void testTreeListener() throws Exception {
String parent_path = "/tree_node_parent";
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(parent_path);

Thread.sleep(1000);

TreeCache treeCache = new TreeCache(client, parent_path);
treeCache.start();
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
switch (event.getType()) {
case NODE_ADDED:
System.out
.println("TreeNode added: " + event.getData()
.getPath() + " , data: " + new String(event.getData().getData()));
break;
case NODE_UPDATED:
System.out
.println("TreeNode updated: " + event.getData()
.getPath() + " , data: " + new String(event.getData().getData()));
break;
case NODE_REMOVED:
System.out
.println("TreeNode removed: " + event.getData()
.getPath());
break;
default:
break;
}
}
});

//创建孩子节点
String child_path = parent_path + "/child";
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(child_path);

Thread.sleep(1000);

//创建孙子节点
String grandson_path = child_path + "/grandson";
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(grandson_path);

Thread.sleep(1000);

//更新孙子节点数据
client.setData().forPath(grandson_path, "new_data".getBytes());

Thread.sleep(1000);

//删除孙子节点
client.delete().deletingChildrenIfNeeded().forPath(grandson_path);

Thread.sleep(1000);

treeCache.close();

client.delete().deletingChildrenIfNeeded().forPath(parent_path);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper curator