您的位置:首页 > 其它

zookeeper-curator

2017-08-31 17:33 141 查看

Curator_Start

package test.ygy.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/27.
*/
public class Curator_Start {

private static final Logger log = LoggerFactory.getLogger(Curator_Start.class);

public static final String CONNECT_STRING="192.168.150.130:2181,192.168.150.132:2181,192.168.150.133:2181";

public static final  String CONNECT_STRING_YGY = "www.ygy.com:2181";

public static final  int SESSION_TIMEOUT_MS = 5000;

public static final  int CONNECTION_TIMEOUT_MS  = 5000 ;

public static void main(String[] args) throws  Exception {

//public static CuratorFramework newClient(
// String connectString,   //服务器连接地址
// int sessionTimeoutMs,   //会话超时时间,单位为毫秒 默认60000
// int connectionTimeoutMs,  //连接创建超时时间,单位为毫秒。
// RetryPolicy retryPolicy) {  //重试策略接口
//public boolean      allowRetry(int retryCount,   //已经重试的次数
// long elapsedTimeMs,  //从第一次重试开始已经花费的时间,单位毫秒
// RetrySleeper sleeper);  //用于sleep指定的时间

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
//public ExponentialBackoffRetry(int baseSleepTimeMs,   初始sleep时间
// int maxRetries,      最大重试次数
// int maxSleepMs)      最大sleep时间
// copied from Hadoop's RetryPolicies.java  ExponentialBackoffRetry 睡眠策略
//long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
// sleepMs 将不会超过最大sleep时间

//CuratorFramework curatorFramework=CuratorFrameworkFactory.newClient(
//        CONNECT_STRING, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, retryPolicy);

//fluent风格
CuratorFramework curatorFramework=CuratorFrameworkFactory.builder()
.connectString(CONNECT_STRING)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.namespace("isolation")   //不同的命名空间,隔离zookeeper业务,互不干扰
.retryPolicy(retryPolicy).build();

curatorFramework.start();
log.warn(" 启动成功");

Thread.sleep(10000000);
curatorFramework.close();
}

public static CuratorFrameworkFactory.Builder getYGYBuilder() {
return  CuratorFrameworkFactory.builder()
.connectString(CONNECT_STRING_YGY)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
//.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.retryPolicy(new ExponentialBackoffRetry(1000,3));
}

public static CuratorFrameworkFactory.Builder getBaseBuilder() {
return  CuratorFrameworkFactory.builder()
.connectString(CONNECT_STRING)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
//.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.retryPolicy(new ExponentialBackoffRetry(1000,3));
}
}


Curator_Crud

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/27.
*/
public class Curator_Crud {

private static final Logger log=LoggerFactory.getLogger(Curator_Crud.class);

public static void main(String[] args) throws Exception {

CuratorFramework cf=Curator_Start.getBaseBuilder().build();

cf.start();
log.warn(" zookeeper 启动成功");

//创建节点  默认创建持久节点,节点数据默认为空
//************* 报错KeeperErrorCode = ConnectionLoss for /zk-test  curator 与 服务器版本冲突
//cf.create().forPath("/zk-test");

//递归创建
//cf.create().creatingParentsIfNeeded()   //如果父节点不存在 递归创建
//        .withMode(CreateMode.PERSISTENT)    //指定节点类型
//        .forPath("/test/child", "init".getBytes());   //初始化值  /test:空  test/child:init

//删除节点
//cf.delete().forPath("/test/child");

//递归删除当前节点及当前节点的所有子节点
//cf.delete().guaranteed()    //只要当前会话内,curator会在后台持续进行删除,直到节点被删除
//        .deletingChildrenIfNeeded()
//        .forPath("/test");

//获取节点数据
//byte[] bytes=cf.getData().forPath("/test");
//log.warn(" /test 's data is " + new String(bytes));
//23:37:09.563 [main] WARN test.ygy.curator.Curator_Crud -  /test 's data is 1111

//获取节点数据,同时获取stat(服务器创建新stat替换原stat)
//Stat oldStat = new Stat() ;
//byte[] bytes=cf.getData().storingStatIn(oldStat).forPath("/test");
//log.warn(" stat  =" + oldStat);
//23:39:20.694 [main] WARN test.ygy.curator.Curator_Crud -  stat  =21474836550,21474836550,1503848137880,1503848137880,0,0,0,0,4,0,21474836550

//更新节点数据
Stat stat=cf.setData()
.withVersion(-1)  //根据版本信息来修改,可从stat中获取实现cas
.forPath("/test", "new data ".getBytes());
log.warn(" updated  stat = " + stat);
//23:45:05.168 [main] WARN test.ygy.curator.Curator_Crud -  updated  stat = 21474836550,21474836556,1503848137880,1503848704703,1,0,0,0,9,0,21474836550

Thread.sleep(2000);

cf.close();

}
}


Cruator_BackGround

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by guoyao on 2017/8/27.
*/
public class Cruator_BackGround {

private  static  final Logger log =LoggerFactory.getLogger(Cruator_BackGround.class);

public static void main(String[] agrs) throws  Exception {

ExecutorService executorService=Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch=new CountDownLatch(4);

CuratorFramework cf=Curator_Start.getBaseBuilder().build();
cf.start();
log.warn(" zookeeper 启动成功");
Thread.sleep(10000);
//使用线程池
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(
(x, y) -> {   //x ,y : CuratorFramework client 客户端  CuratorEvent event事件
log.warn(" event = " + y);
log.warn(" execcutors threadName = " + Thread.currentThread().getName());
countDownLatch.countDown();
}
,executorService).forPath("/test", "test back".getBytes());
//00:28:38.572 [pool-1-thread-1] WARN test.ygy.curator.Cruator_BackGround -  event = CuratorEventImpl{type=CREATE, resultCode=0, path='/test', name='/test', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null}
//00:28:38.572 [pool-1-thread-1] WARN test.ygy.curator.Cruator_BackGround -  execcutors threadName = pool-1-thread-1

cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(
(x, y) -> {   //x ,y : CuratorFramework client 客户端  CuratorEvent event事件
log.warn(" event = " + y);
log.warn(" execcutors threadName = " + Thread.currentThread().getName());
countDownLatch.countDown();
}
,executorService).forPath("/test", "test back".getBytes());
//00:28:38.621 [pool-1-thread-2] WARN test.ygy.curator.Cruator_BackGround -  event = CuratorEventImpl{type=CREATE, resultCode=-110, path='/test', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null}
//00:28:38.622 [pool-1-thread-2] WARN test.ygy.curator.Cruator_BackGround -  execcutors threadName = pool-1-thread-2

//不使用线程池,默认使用main-EventThread ,多个事件默认时,是严格按照顺序执行。
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(
(x, y) -> {   //x ,y : CuratorFramework client 客户端  CuratorEvent event事件
log.warn(" event = " + y);
log.warn(" execcutors threadName = " + Thread.currentThread().getName());
countDownLatch.countDown();
}
).forPath("/test", "test back".getBytes());
//00:28:38.622 [main-EventThread] WARN test.ygy.curator.Cruator_BackGround -  event = CuratorEventImpl{type=CREATE, resultCode=-110, path='/test', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null}
//00:28:38.622 [main-EventThread] WARN test.ygy.curator.Cruator_BackGround -  execcutors threadName = main-EventThread

cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(
(x, y) -> {   //x ,y : CuratorFramework client 客户端  CuratorEvent event事件
log.warn(" event = " + y);
log.warn(" execcutors threadName = " + Thread.currentThread().getName());
countDownLatch.countDown();
}
).forPath("/test", "test back".getBytes());
//00:28:38.623 [main-EventThread] WARN test.ygy.curator.Cruator_BackGround -  event = CuratorEventImpl{type=CREATE, resultCode=-110, path='/test', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null}
//00:28:38.623 [main-EventThread] WARN test.ygy.curator.Cruator_BackGround -  execcutors threadName = main-EventThread

countDownLatch.await();

Thread.sleep(10000);
executorService.shutdown();
cf.close();
}
}


Curator_NodeCacheListener

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

/**
* Created by guoyao on 2017/8/28.
*/
public class Curator_NodeCacheListener {

private static final Logger log=LoggerFactory.getLogger(Curator_NodeCacheListener.class);

public static void main(String[] args) throws  Exception  {

CountDownLatch countDownLatch=new CountDownLatch(1);

CuratorFramework cf=Curator_Start.getYGYBuilder().build();
cf.start();
log.warn(" zookeeper  启动成功");

//创建一个节点
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/test", "init".getBytes());

//nodeCache  client :客户端  path : 节点路径  dataIsCompressed : 是否压缩
NodeCache nodeCache=new NodeCache(cf, "/test", false);
nodeCache.start(true);
// nodeCache 可以监听指定节点数据变化,与节点是否存在
nodeCache.getListenable().addListener(
()->{
log.warn("data changed to " + new String(nodeCache.getCurrentData().getData()));
countDownLatch.countDown();
}
);

//设置新值,
cf.setData().forPath("/test", " new data ".getBytes());
countDownLatch.await();
cf.delete().forPath("/test");

Thread.sleep(20000);
cf.close();
}
}


Curator_PathChildrenListener

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/29.
*/
public class Curator_PathChildrenListener {

private static final Logger log=LoggerFactory.getLogger(Curator_PathChildrenListener.class);

public static void main(String[] args) throws Exception {

CuratorFramework cf=Curator_Start.getYGYBuilder().build();
cf.start();
log.warn(" zookeepeer  启动成功");

/**
* @param client           the client  客服端
* @param path             path to watch  节点路径
* @param cacheData        if true, node contents are cached in addition to the stat  是否需要获取节点数据
* @param dataIsCompressed if true, data in the path is compressed  是否压缩
* @param executorService  Closeable ExecutorService to use for the PathChildrenCache's background thread
*/

// 默认线程池为守护线程  主线程执行完毕即停止
//return (new ThreadFactoryBuilder()).setNameFormat(processName + "-%d").setDaemon(true).build();
PathChildrenCache pathChildrenCache=new PathChildrenCache(
cf,
"/test",
true
);
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
//10:33:44.078 [PathChildrenCache-0] WARN test.ygy.curator.Curator_PathChildrenListener -  INITIALIZED 初始化
case INITIALIZED:
log.warn(" INITIALIZED 初始化");
break;
// 10:33:45.955 [PathChildrenCache-0] WARN test.ygy.curator.Curator_PathChildrenListener -  CHILD_ADDED 新增子节点 ChildData{path='/test/child1', stat=70,70,1503974025843,1503974025843,0,0,0,98542789669486605,11,0,70
//, data=[116, 101, 115, 116, 32, 99, 104, 105, 108, 100, 49]}
case CHILD_ADDED:
log.warn(" CHILD_ADDED 新增子节点 " + event.getData());
break;
//10:33:54.062 [PathChildrenCache-0] WARN test.ygy.curator.Curator_PathChildrenListener -  CHILD_REMOVED 移除子节点ChildData{path='/test/child3', stat=72,72,1503974029930,1503974029930,0,0,0,98542789669486605,11,0,72
// , data=[116, 101, 115, 116, 32, 99, 104, 105, 108, 100, 51]}
case CHILD_REMOVED:
log.warn(" CHILD_REMOVED 移除子节点" + event.getData());
break;
//10:33:52.056 [PathChildrenCache-0] WARN test.ygy.curator.Curator_PathChildrenListener -  CHILD_UPDATED 修改子节点数据ChildData{path='/test/child2', stat=71,73,1503974027885,1503974031986,1,0,0,98542789669486605,15,0,71
//, data=[110, 101, 119, 32, 99, 104, 105, 108, 100, 50, 32, 100, 97, 116, 97]}
case CHILD_UPDATED:
log.warn(" CHILD_UPDATED 修改子节点数据" + event.getData());
break;
case CONNECTION_LOST:
log.warn(" CONNECTION_LOST 确认失去连接");
break;
case CONNECTION_SUSPENDED:
log.warn(" CONNECTION_SUSPENDED 连接挂起,可能失去连接");
break;
case CONNECTION_RECONNECTED:
log.warn(" CONNECTION_RECONNECTED 重新获取连接");
break;
default:
break;
}
}
}
);
//报错 pathChildrenCache 构造方法中已经会确保该path 被创建
//cf.create().withMode(CreateMode.PERSISTENT).forPath("/test", "test".getBytes());
Thread.sleep(2000);
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/test/child1", "test child1".getBytes());
Thread.sleep(2000);
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/test/child2", "test child2".getBytes());
Thread.sleep(2000);
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/test/child3", "test child3".getBytes());
Thread.sleep(2000);
cf.setData().forPath("/test/child2", "new child2 data".getBytes());
Thread.sleep(2000);
cf.delete().forPath("/test/child3");
Thread.sleep(20000);
cf.close();
}
}


Cruator_LeaderSelector

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.utils.EnsurePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/28.
*/
//leader选举
public class Cruator_LeaderSelector {

private static final Logger log=LoggerFactory.getLogger(Cruator_LeaderSelector.class);

private static final String MASTER_PATH="/curator-master";

public static void main(String[] args) throws  Exception {
CuratorFramework cf=Curator_Start.getBaseBuilder().build();
cf.start();
log.warn(" zookeeper  启动成功");

//保证节点存在
new EnsurePath(MASTER_PATH+"/AAAA").ensure(cf.getZookeeperClient());
//   * @param client          the client  客户端
//  * @param leaderPath      the path for this leadership group  节点路径
//* @param executorService thread pool to use   线程池
// * @param listener        listener   监听器
LeaderSelector leaderSelector = new LeaderSelector(
cf
, MASTER_PATH
, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {

log.warn(" i am master ");
Thread.sleep(3000);
log.warn(" off master ");
}
});
leaderSelector.autoRequeue();
leaderSelector.start();

Thread.sleep(200000000);
cf.close();
}
}


Curator_Atomic

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/29.
*/
public class Curator_Atomic {

private static final Logger log=LoggerFactory.getLogger(Curator_Atomic.class);

public static void main(String[] args) throws Exception {

CuratorFramework cf=Curator_Start.getBaseBuilder().build();
cf.start();
log.warn("  zookeeper 启动成功");

DistributedAtomicInteger distributedAtomicInteger=
new DistributedAtomicInteger(cf, "/test/atomic", new RetryNTimes(3, 2000));
AtomicValue<Integer> increment1=distributedAtomicInteger.increment();
log.warn("  atomic post inc1 = " + increment1.postValue());
log.warn("  atomic pre inc1 = " + increment1.preValue());
//22:32:31.193 [main] WARN test.ygy.curator.Curator_Atomic -   atomic post inc1 = 1
//22:32:31.193 [main] WARN test.ygy.curator.Curator_Atomic -   atomic pre inc1 = 0
AtomicValue<Integer> increment2=distributedAtomicInteger.increment();
log.warn("  atomic post inc2 = " + increment2.postValue());
log.warn("  atomic pre inc2 = " + increment2.preValue());
//22:32:31.212 [main] WARN test.ygy.curator.Curator_Atomic -   atomic post inc2 = 2
//22:32:31.212 [main] WARN test.ygy.curator.Curator_Atomic -   atomic pre inc2 = 1

Thread.sleep(20000);
cf.close();
}
}


Cruator_InterLock

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by guoyao on 2017/8/28.
*/
public class Cruator_InterLock {

private static final Logger log=LoggerFactory.getLogger(Cruator_InterLock.class);

private  static  int index = 0 ;

public static void main(String[] args) throws Exception {

CuratorFramework cf=Curator_Start.getBaseBuilder().build();
cf.start();
log.warn(" zookeeper  启动成功");
ExecutorService executorService=Executors.newFixedThreadPool(100);
CountDownLatch countDownLatch=new CountDownLatch(100);

//zookeeper 锁
InterProcessMutex interProcessMutex=new InterProcessMutex(cf,"/test/lock");
for (int i=0; i < 100; i++) {
executorService.execute(()->{
try {
Thread.sleep(1000);
interProcessMutex.acquire();
index++;
interProcessMutex.release();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
countDownLatch.countDown();
}
});
}
executorService.shutdown();
countDownLatch.await();
log.warn("   index add 100 times = " + index);

}
}


Curator_Barrier

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/29.
*/
public class Curator_Barrier {

private static final Logger log=LoggerFactory.getLogger(Curator_Barrier.class);

public static void main(String[] args) throws Exception {
CuratorFramework cf=Curator_Start.getBaseBuilder().build();
cf.start();
log.warn(" zookeeper 启动成功");

DistributedBarrier distributedBarrier = new DistributedBarrier(cf,"/test/barries");
DistributedAtomicInteger distributedAtomicInteger=
new DistributedAtomicInteger(cf, "/test/ato", new RetryNTimes(3, 2000));
//cf.create().creatingParentsIfNeeded().forPath("/test/barries");
Stat stat1=cf.checkExists().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {

}
}).forPath("/test/barries");
log.warn(" check  stat1 = " + stat1);

for(int i = 0 ; i < 10 ; i ++) {
new Thread(
()->{
try {
distributedBarrier.setBarrier();   //设置栅栏
Thread.sleep(1000);
log.warn(Thread.currentThread().getName() + " is waiting");
distributedAtomicInteger.increment();  //计数
distributedBarrier.waitOnBarrier();   //无线等待
//do work
Thread.sleep(2000);
log.warn(Thread.currentThread().getName() + " is running");
} catch (Exception e) {
e.printStackTrace();
}
}
," thread -- " + i).start();
}
int index = 0 ;
while (distributedAtomicInteger.get().postValue() < 10) {

//等待所有任务进入
if (index < distributedAtomicInteger.get().postValue()) {
log.warn(" 当前已备注数为 " + distributedAtomicInteger.get().postValue());
index = distributedAtomicInteger.get().postValue();
}

}
Stat stat2=cf.checkExists().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {

}
}).forPath("/test/barries");
log.warn(" check  stat2 = " + stat2);
distributedBarrier.removeBarrier();
Stat stat3=cf.checkExists().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {

}
}).forPath("/test/barries");
log.warn(" check  stat3 = " + stat3);

Thread.sleep(100000);
cf.close();
}
}


Curator_Barray_Double

package test.ygy.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by guoyao on 2017/8/30.
*/
public class Curator_Barray_Double {

private static final Logger log=LoggerFactory.getLogger(Curator_Barray_Double.class);

public static void main(String[] args) throws Exception {
CuratorFramework cf=Curator_Start.getYGYBuilder().build();
cf.start();
log.warn(" zookeeper 启动成功");
for(int i = 0 ; i < 10 ; i ++) {
new Thread(
()->{
try {
log.warn(Thread.currentThread().getName() + " is waiting");
// 由ourPath 控制个数,需要每次都new一个对象
DistributedDoubleBarrier distributedDoubleBarrier = new DistributedDoubleBarrier(cf,"/test/barries",5);
distributedDoubleBarrier.enter();   //设置栅栏
Thread.sleep(1000);
//do work
log.warn(Thread.currentThread().getName() + " is working");
distributedDoubleBarrier.leave();   //退出
log.warn(Thread.currentThread().getName() + " is out");
} catch (Exception e) {
e.printStackTrace();
}
}
," thread -- " + i).start();
}

Thread.sleep(100000);
cf.close();

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  curator