Zookeeper应用场景之分布式屏障Barrier
2016-06-24 14:30
246 查看
Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。下面放上一个简陋的图以便理解。
要解决的问题如下:1.如何控制所有线程同时开始?所有的线程启动时在zookeeper节点/barrier下插入顺序临时节点,然后检查/barrier下所有children节点的数量是否为所有的线程数,如果不是,则等待。如果是,则开始执行。2.如何等待所有线程结束?所有线程在执行完毕后,都检查/barrier下所有children节点数量是否为0,若不为0,则继续等待。3.用什么类型的节点?根节点使用持久节点(persistent node),子节点使用临时节点(Ephemeral node)根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就crash了,如果是持久节点,节点不会被删除。 首先我写了一个ZookeeperClient,由这个client端负维护一个Zookeeper对象,并对Zookeeper节点进行操作。
Barrier类,负责实现屏障的功能
BarrierDemo类,负责启动线程,为了模拟等待,我为3个线程设置了不同的休眠时间。预想的结果是t3首先删除节点,此时子节点剩下两个(t1和t2),t3不会结束,而是继续轮询。t2随后删除节点,此时子节点剩下1个(t1),t2和t3继续轮询。t3最后删除节点,此时没有子节点,t1,t2,t3全部结束。
运行结果如下
要解决的问题如下:1.如何控制所有线程同时开始?所有的线程启动时在zookeeper节点/barrier下插入顺序临时节点,然后检查/barrier下所有children节点的数量是否为所有的线程数,如果不是,则等待。如果是,则开始执行。2.如何等待所有线程结束?所有线程在执行完毕后,都检查/barrier下所有children节点数量是否为0,若不为0,则继续等待。3.用什么类型的节点?根节点使用持久节点(persistent node),子节点使用临时节点(Ephemeral node)根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就crash了,如果是持久节点,节点不会被删除。 首先我写了一个ZookeeperClient,由这个client端负维护一个Zookeeper对象,并对Zookeeper节点进行操作。
public class ZookeeperClient { public static ZooKeeper zooKeeper; public static final String IP_ADDRESS="xxxx:2181"; public static void init() throws Exception{ zooKeeper = new ZooKeeper(IP_ADDRESS, 15000, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState()== Event.KeeperState.SyncConnected) { } } }); } public static String createTempNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("node "+ path +" with data is created,return node "+node); return node; } catch (KeeperException e) { e.printStackTrace(); return "ERROR"; } catch (InterruptedException e) { e.printStackTrace(); return "ERROR"; } } public static boolean delete(String path,int version) { try { zooKeeper.delete(path,version); System.out.println("delete path:"+ path + "success"); return true; } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (KeeperException e) { e.printStackTrace(); return false; } } public static boolean createPersistentNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("node "+ path +" with data is created"); return true; } catch (KeeperException e) { e.printStackTrace(); return false; } catch (InterruptedException e) { e.printStackTrace(); return false; } } public static boolean checkExists(String path){ try { Stat stat = zooKeeper.exists(path,true); if(stat!=null) { return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public static int getChildrens(String path) { try { return zooKeeper.getChildren(path,true).size(); } catch (KeeperException e) { e.printStackTrace(); return -1; } catch (InterruptedException e) { e.printStackTrace(); return -1; } } }
Barrier类,负责实现屏障的功能
public class Barrier { private int size; private String rootPath; public Barrier(int size,String rootPath){ this.rootPath = rootPath; this.size = size; } public void init() throws Exception { ZookeeperClient.init(); if(!ZookeeperClient.checkExists(rootPath)){ ZookeeperClient.createPersistentNode(rootPath,"1"); } } public boolean enter(String name,String number){ ZookeeperClient.createTempNode(rootPath+"/"+name, number); //如果节点下children的数量没有达到所有线程的总数,则继续轮询。 //此时要等待所有的线程都在根节点下创建了节点,才开始执行 while(true) { int size=ZookeeperClient.getChildrens(rootPath); if (size != this.size) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { return true; } } } public boolean exit(String name){ //先删除自己的节点 ZookeeperClient.delete(rootPath+"/"+name,0); //如果节点下children数量大于0,则继续轮询 //此时要等待所有的线程都删除了节点,即所有线程都做完了该做的事情,才结束线程。确保所有的线程同时结束。 while(true){ int size = ZookeeperClient.getChildrens(rootPath); if(size!=0) { System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { return true; } } } }
BarrierDemo类,负责启动线程,为了模拟等待,我为3个线程设置了不同的休眠时间。预想的结果是t3首先删除节点,此时子节点剩下两个(t1和t2),t3不会结束,而是继续轮询。t2随后删除节点,此时子节点剩下1个(t1),t2和t3继续轮询。t3最后删除节点,此时没有子节点,t1,t2,t3全部结束。
public class BarrierDemo { public static void main(String[] args) throws Exception{ Barrier barrier = new Barrier(3,"/barrier"); barrier.init(); Worker worker1 = new Worker(barrier,10000); Worker worker2 = new Worker(barrier,5000); Worker worker3 = new Worker(barrier,2000); Thread t1 = new Thread(worker1,"t1"); Thread t2 = new Thread(worker2,"t2"); Thread t3 = new Thread(worker3,"t3"); t1.start(); t2.start(); t3.start(); } } class Worker implements Runnable { Barrier barrier; long time; Worker(Barrier barrier, long time){ this.barrier = barrier; this.time = time; } public void run() { boolean isEnter=barrier.enter(Thread.currentThread().getName(),"0"); if(isEnter) { System.out.println(Thread.currentThread().getName()+"is working on something important now"); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } boolean isExit=barrier.exit(Thread.currentThread().getName()); if (isExit) { System.out.println(Thread.currentThread().getName()+"is exiting.."); } } }
运行结果如下
相关文章推荐
- genymotion报错 Unable to start VirtualBox部分解决方法
- jdbc连接数据库使用sid和service_name的区别
- 入门Github Pages
- 实验四主存空间的分配和回收
- 面试题14:基于某个条件来调整数组中元素的位置
- sevlet
- [LeetCode129] Sum Root to Leaf Numbers
- Android 布局技巧
- 166. Fraction to Recurring Decimal
- $.post以及解析xml-基于jQuery的Ajax聊天室程序
- SQLSTATE[22001]: String data, right truncated: 1406 Data too long for column 'dtdate' 解决方法
- Ubuntu/Linux网络配置命令
- Android项目重构之路:架构篇
- 分享 兼容ie6
- 计算月和日
- HashMap与HashTable的区别
- iOS 真机 Log日志
- 用开源工具Xplico助力网络应用层数据解码
- 用开源工具Xplico助力网络应用层数据解码
- 遍历Arraylist的方法: