Zookeeper(八)分布式队列
2013-05-11 13:11
525 查看
1. element 方法 获取对列头部第一个元素
查找队列znode 下所有的子节点名称 使用TreeMap给顺序编号排序 返回第一个znode对应的值
获取所有子节点名称
2. remove 返回对列头部的第一个元素的值 并删除该znode
3. take 检索并移除对列头部一个znode对应的值 如果对列为空 则一直等待
4. offer 写入队列尾部 使用永久顺序节点
5.peek 返回对列头部 第一个znode对应的值
6. 返回队列头部第一个znode对应的值 并删除该znode
查找队列znode 下所有的子节点名称 使用TreeMap给顺序编号排序 返回第一个znode对应的值
public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren; while(true){ try{ // 返回对列中的全部元素name 这里使用到了TreeMap // map的key为znode的顺序编号 orderedChildren = orderedChildren(null); }catch(KeeperException.NoNodeException e){ throw new NoSuchElementException(); } if(orderedChildren.size() == 0 ) throw new NoSuchElementException(); for(String headNode : orderedChildren.values()){ if(headNode != null){ try{ // 返回对列头部第一个znode对应数据. return zookeeper.getData(dir+"/"+headNode, false, null); }catch(KeeperException.NoNodeException e){ } } } } }
获取所有子节点名称
private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>(); List<String> childNames = null; try{ // 返回对列节点下 所有的子znode 名称 childNames = zookeeper.getChildren(dir, watcher); }catch (KeeperException.NoNodeException e){ throw e; } // 所有子节点 for(String childName : childNames){ try{ //Check format if(!childName.regionMatches(0, prefix, 0, prefix.length())){ LOG.warn("Found child node with improper name: " + childName); continue; } String suffix = childName.substring(prefix.length()); // 顺序节点编号 Long childId = new Long(suffix); orderedChildren.put(childId,childName); }catch(NumberFormatException e){ LOG.warn("Found child node with improper format : " + childName + " " + e,e); } } return orderedChildren; }
2. remove 返回对列头部的第一个元素的值 并删除该znode
public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren; while(true){ try{ // 查找所有子节点 用TreeMap排序 orderedChildren = orderedChildren(null); }catch(KeeperException.NoNodeException e){ throw new NoSuchElementException(); } if(orderedChildren.size() == 0) throw new NoSuchElementException(); for(String headNode : orderedChildren.values()){ String path = dir +"/"+headNode; try{ // 对列头部 第一个节点对应的数据 byte[] data = zookeeper.getData(path, false, null); // 删除该节点 zookeeper.delete(path, -1); return data; }catch(KeeperException.NoNodeException e){ // Another client deleted the node first. } } } }
3. take 检索并移除对列头部一个znode对应的值 如果对列为空 则一直等待
public byte[] take() throws KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren; // Same as for element. Should refactor this. while(true){ LatchChildWatcher childWatcher = new LatchChildWatcher(); try{ // 查找所有对列元素 并给对列主znode 设置监视器 orderedChildren = orderedChildren(childWatcher); }catch(KeeperException.NoNodeException e){ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); continue; } // 如果对列中不存在元素 // 对列节点下不存在子节点, 线程将一直等待. 使用到CountDownLatch // 当客户端调用 offer方法 写入一个元素时 触发LatchChildWatcher监视器CountDownLatch 计数减1 为0时 当前线程获得运行机会 if(orderedChildren.size() == 0){ childWatcher.await(); continue; } for(String headNode : orderedChildren.values()){ String path = dir +"/"+headNode; try{ // 返回对列头部第一个元素 byte[] data = zookeeper.getData(path, false, null); // 删除该元素 zookeeper.delete(path, -1); return data; }catch(KeeperException.NoNodeException e){ // Another client deleted the node first. } } } }
private class LatchChildWatcher implements Watcher { CountDownLatch latch; public LatchChildWatcher(){ latch = new CountDownLatch(1); } public void process(WatchedEvent event){ LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); latch.countDown(); } public void await() throws InterruptedException { latch.await(); } }
4. offer 写入队列尾部 使用永久顺序节点
public boolean offer(byte[] data) throws KeeperException, InterruptedException{ for(;;){ try{ // 写入对列 节点类型 永久顺序节点 zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); return true; }catch(KeeperException.NoNodeException e){ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); } } }
5.peek 返回对列头部 第一个znode对应的值
public byte[] peek() throws KeeperException, InterruptedException{ try{ // 返回对列头部第一个znode的值 return element(); }catch(NoSuchElementException e){ return null; } }
6. 返回队列头部第一个znode对应的值 并删除该znode
public byte[] poll() throws KeeperException, InterruptedException { try{ // 返回对列znode下 第一个子节点值 并删除该节点 return remove(); }catch(NoSuchElementException e){ return null; } }
相关文章推荐
- ZooKeeper实现分布式队列Queue
- Zookeeper分布式队列的实现
- Zookeeper场景实践:(8) 分布式队列
- 基于ZooKeeper的分布式锁和队列
- ZooKeeper实现分布式FIFO队列
- 基于ZooKeeper的分布式锁和队列
- ZooKeeper实现分布式队列Queue
- 分布式队列ZooKeeper的实现
- Zookeeper和Curator-Framework实践之:分布式消息队列
- 基于ZooKeeper的分布式锁和队列
- ZooKeeper的典型应用场景之分布式队列。
- ZooKeeper实现分布式队列Queue
- ZooKeeper分布式队列实现MapReduce任务集成
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式FIFO队列
- ZooKeeper 实现分布式队列
- 基于ZooKeeper的分布式锁和队列
- ZooKeeper 实现分布式队列
- 基于ZooKeeper的分布式锁和队列
- ZooKeeper实现分布式队列