zookeeper订阅与发布实现
2015-10-30 14:10
281 查看
方式一:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
public class CuratorWatch {
static CuratorFramework zkclient = null;
static String nameSpace = "mo";
static {
String zkhost = "10.144.5.218:2181";// zk的host
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);// 重试机制
Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)
.connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(rp);
builder.namespace(nameSpace);
CuratorFramework zclient = builder.build();
zkclient = zclient;
zkclient.start();// 放在这前面执行
zkclient.newNamespaceAwareEnsurePath("/" + nameSpace);
CuratorWatcher callbackWatcher = new CuratorWatcher() {
public void process(WatchedEvent event) throws Exception {
//由于监听触发后即消失,若想持续监听,需要再次追加监听
byte[] data = (byte[]) ((BackgroundPathable) zkclient.getData().usingWatcher(this))
.forPath("/mojianli");
//TODO 具体业务逻辑
}
};
try {
//首次添加监听
((BackgroundPathable) zkclient.getData().usingWatcher(callbackWatcher))
.forPath("/mojianli");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Thread.sleep(Long.MAX_VALUE);
}
}
方式二:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
public class CuratorWatch {
static CuratorFramework zkclient = null;
static String nameSpace = "mo";
static {
String zkhost = "10.144.5.218:2181";// zk的host
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);// 重试机制
Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)
.connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(rp);
builder.namespace(nameSpace);
CuratorFramework zclient = builder.build();
zkclient = zclient;
zkclient.start();// 放在这前面执行
zkclient.newNamespaceAwareEnsurePath("/" + nameSpace);
}
public static void main(String[] args) throws Exception {
watch();
Thread.sleep(Long.MAX_VALUE);
}
/**
*
* 监听节点变化
*
*/
public static void watch() throws Exception {
PathChildrenCache cache = new PathChildrenCache(zkclient, "/mo", false);
cache.start();
System.out.println("监听开始/zk........");
PathChildrenCacheListener plis = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED: {
System.out.println("Node added: "
+ ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_UPDATED: {
System.out.println("Node changed: "
+ ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_REMOVED: {
System.out.println("Node removed: "
+ ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
}
}
};
// 注册监听
cache.getListenable().addListener(plis);
}
}
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
public class CuratorWatch {
static CuratorFramework zkclient = null;
static String nameSpace = "mo";
static {
String zkhost = "10.144.5.218:2181";// zk的host
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);// 重试机制
Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)
.connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(rp);
builder.namespace(nameSpace);
CuratorFramework zclient = builder.build();
zkclient = zclient;
zkclient.start();// 放在这前面执行
zkclient.newNamespaceAwareEnsurePath("/" + nameSpace);
CuratorWatcher callbackWatcher = new CuratorWatcher() {
public void process(WatchedEvent event) throws Exception {
//由于监听触发后即消失,若想持续监听,需要再次追加监听
byte[] data = (byte[]) ((BackgroundPathable) zkclient.getData().usingWatcher(this))
.forPath("/mojianli");
//TODO 具体业务逻辑
}
};
try {
//首次添加监听
((BackgroundPathable) zkclient.getData().usingWatcher(callbackWatcher))
.forPath("/mojianli");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Thread.sleep(Long.MAX_VALUE);
}
}
方式二:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
public class CuratorWatch {
static CuratorFramework zkclient = null;
static String nameSpace = "mo";
static {
String zkhost = "10.144.5.218:2181";// zk的host
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);// 重试机制
Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)
.connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(rp);
builder.namespace(nameSpace);
CuratorFramework zclient = builder.build();
zkclient = zclient;
zkclient.start();// 放在这前面执行
zkclient.newNamespaceAwareEnsurePath("/" + nameSpace);
}
public static void main(String[] args) throws Exception {
watch();
Thread.sleep(Long.MAX_VALUE);
}
/**
*
* 监听节点变化
*
*/
public static void watch() throws Exception {
PathChildrenCache cache = new PathChildrenCache(zkclient, "/mo", false);
cache.start();
System.out.println("监听开始/zk........");
PathChildrenCacheListener plis = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED: {
System.out.println("Node added: "
+ ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_UPDATED: {
System.out.println("Node changed: "
+ ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_REMOVED: {
System.out.println("Node removed: "
+ ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
}
}
};
// 注册监听
cache.getListenable().addListener(plis);
}
}
相关文章推荐
- LeetCode---Construct Binary Tree from Inorder and Postorder Traversal
- 如何使用测试证书来生成越狱版的ipa包
- Android ADB命令大全(通过ADB命令查看wifi密码、MAC地址、设备信息、操作文件、查看文件、日志信息、卸载、启动和安装APK等)
- 主键乱序插入对Innodb性能的影响
- URL Regex expression
- vs2010下release版本调试设置
- 使用CocoaPods一直停留在:Updating local specs repositories
- 1044. 火星数字(20)
- 增加评论逻辑
- Django 源码小剖: 初探 WSGI
- C++学习笔记--GCC编译过程
- 第21章 动态链接库
- 黑马程序员_java集合框架的一些总结
- 排序算法(持续更新...)
- ABP使用及框架解析系列 - [Unit of Work part.2-框架实现]
- iOS: ARC和非ARC下使用Block属性的问题
- 一些服务器编程的概念
- 余弦计算相似度度量(优秀)
- 1043. 输出PATest(20)
- 安装Ubuntu后要做的事