您的位置:首页 > 其它

Zookeeper应用场景之分布式屏障Barrier

2016-06-24 14:30 246 查看
Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。下面放上一个简陋的图以便理解。

要解决的问题如下:1.如何控制所有线程同时开始?所有的线程启动时在zookeeper节点/barrier下插入顺序临时节点,然后检查/barrier下所有children节点的数量是否为所有的线程数,如果不是,则等待。如果是,则开始执行。2.如何等待所有线程结束?所有线程在执行完毕后,都检查/barrier下所有children节点数量是否为0,若不为0,则继续等待。3.用什么类型的节点?根节点使用持久节点(persistent node),子节点使用临时节点(Ephemeral node)根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就crash了,如果是持久节点,节点不会被删除。 首先我写了一个ZookeeperClient,由这个client端负维护一个Zookeeper对象,并对Zookeeper节点进行操作。
public class ZookeeperClient {
public static ZooKeeper zooKeeper;
public static final String IP_ADDRESS="xxxx:2181";
public static void init() throws  Exception{

zooKeeper = new ZooKeeper(IP_ADDRESS, 15000, new Watcher() {

public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState()== Event.KeeperState.SyncConnected) {

}
}

});

}
public static String createTempNode(String path,String data) {
try {
String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("node "+ path +" with data is created,return node "+node);
return node;
} catch (KeeperException e) {
e.printStackTrace();
return "ERROR";
} catch (InterruptedException e) {
e.printStackTrace();
return "ERROR";
}
}
public  static boolean delete(String path,int version) {
try {
zooKeeper.delete(path,version);
System.out.println("delete path:"+ path + "success");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (KeeperException e) {
e.printStackTrace();
return false;
}

}
public static boolean createPersistentNode(String path,String data) {
try {
String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("node "+ path +" with data is created");
return true;
} catch (KeeperException e) {
e.printStackTrace();
return false;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
public static boolean checkExists(String path){
try {
Stat stat = zooKeeper.exists(path,true);
if(stat!=null) {
return true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public static int getChildrens(String path) {
try {
return zooKeeper.getChildren(path,true).size();
} catch (KeeperException e) {
e.printStackTrace();
return -1;
} catch (InterruptedException e) {
e.printStackTrace();
return  -1;
}
}
}

Barrier类,负责实现屏障的功能
public class Barrier {
private int size;
private String rootPath;
public Barrier(int size,String rootPath){
this.rootPath = rootPath;
this.size = size;

}
public void init() throws Exception {
ZookeeperClient.init();
if(!ZookeeperClient.checkExists(rootPath)){
ZookeeperClient.createPersistentNode(rootPath,"1");
}
}
public boolean enter(String name,String number){
ZookeeperClient.createTempNode(rootPath+"/"+name, number);
//如果节点下children的数量没有达到所有线程的总数,则继续轮询。
//此时要等待所有的线程都在根节点下创建了节点,才开始执行
while(true) {
int size=ZookeeperClient.getChildrens(rootPath);
if (size != this.size) {
try {

Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
return true;
}
}
}
public  boolean exit(String name){
//先删除自己的节点
ZookeeperClient.delete(rootPath+"/"+name,0);
//如果节点下children数量大于0,则继续轮询
//此时要等待所有的线程都删除了节点,即所有线程都做完了该做的事情,才结束线程。确保所有的线程同时结束。
while(true){
int size = ZookeeperClient.getChildrens(rootPath);
if(size!=0) {
System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
return true;
}
}
}
}

BarrierDemo类,负责启动线程,为了模拟等待,我为3个线程设置了不同的休眠时间。预想的结果是t3首先删除节点,此时子节点剩下两个(t1和t2),t3不会结束,而是继续轮询。t2随后删除节点,此时子节点剩下1个(t1),t2和t3继续轮询。t3最后删除节点,此时没有子节点,t1,t2,t3全部结束。
public class BarrierDemo {
public static void main(String[] args) throws  Exception{
Barrier barrier = new Barrier(3,"/barrier");
barrier.init();
Worker worker1 = new Worker(barrier,10000);
Worker worker2 = new Worker(barrier,5000);
Worker worker3 = new Worker(barrier,2000);
Thread t1 = new Thread(worker1,"t1");
Thread t2 = new Thread(worker2,"t2");
Thread t3 = new Thread(worker3,"t3");
t1.start();
t2.start();
t3.start();

}
}
class Worker implements Runnable {
Barrier barrier;
long time;
Worker(Barrier barrier, long time){
this.barrier = barrier;
this.time = time;

}
public void run() {
boolean isEnter=barrier.enter(Thread.currentThread().getName(),"0");
if(isEnter) {
System.out.println(Thread.currentThread().getName()+"is working on something important now");
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
boolean isExit=barrier.exit(Thread.currentThread().getName());
if (isExit) {
System.out.println(Thread.currentThread().getName()+"is exiting..");
}
}
}

运行结果如下
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: