您的位置:首页 > 其它

zookeeper应用 - FIFO 队列 分布式队列

2015-06-07 19:52 363 查看
使用ZooKeeper实现的FIFO队列,这个队列是分布式的。

package fifo;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 使用ZooKeeper实现的FIFO队列
* @author lisg
*
*/
public class ZKFIFO {
private static final String HOSTS = "vm1";
private ZooKeeper zk = null;
private static final String PARENT_PATH = "/fifo";
private static final String SEQ_PREFIX = "seq-";

public ZKFIFO() {
try {
final CountDownLatch cdl = new CountDownLatch(1);
zk = new ZooKeeper(HOSTS, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(KeeperState.SyncConnected.equals(event.getState())) {
cdl.countDown();
}
}
});

cdl.await();

//创建父节点
Stat stat = zk.exists(PARENT_PATH, false);
if(stat == null) {
zk.create(PARENT_PATH, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
System.out.println("zookeeper集群连接失败!");
e.printStackTrace();
}
}

/**
* 在父节点下创建顺序子节点
* @param data
*/
public void push(String data) {
if(data == null) {
data = "";
}

try {
zk.create(PARENT_PATH + "/" + SEQ_PREFIX,
data.getBytes("UTF-8"),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 删除字第一个子节点,并返回它的值
* @return
*/
public String pop() {
try {
final List<String> children = zk.getChildren(PARENT_PATH, false);
if(children.isEmpty()) {
return null;
}

Collections.sort(children);

String firstChildPath = PARENT_PATH + "/" + children.get(0);

final byte[] data = zk.getData(firstChildPath, false, null);
zk.delete(firstChildPath, -1);

return new String(data, "UTF-8");
} catch (Exception e) {
e.printStackTrace();
}

return null;
}

public void close() {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
final ZKFIFO fifo = new ZKFIFO();

/*
for(int i=0; i<10; i++) {
new Thread() {
public void run() {
fifo.push("data-" + UUID.randomUUID().toString().replace("-", ""));
};
}.start();
}
*/

System.out.println(fifo.pop());

fifo.close();
}
}


  

需要改进的地方:
1)zookeeper异常处理、重试
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: