ZooKeeper实现分布式队列Queue
2014-08-01 14:21
357 查看
http://f.dataguru.cn/portal.php?mod=view&aid=3299
前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。通过这篇文章的分布式队列的案例,你将了解到ZooKeeper的强大。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用
目录
分布式队列
设计思路
程序实现
本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。
x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。
队列管理
Zookeeper 可以处理两种类型的队列:
当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。
同步队列用 Zookeeper 实现的实现思路如下:
创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start
的出现,如果已经相等就创建 /synchronizing/start。
用下面的流程图更容易理解:
[align=center]同步队列流程图 [/align]
同步队列的关键代码如下,完整的代码请看附件:
同步队列
当队列没满是进入 wait(),然后会一直等待 Watch 的通知,Watch 的代码如下:
客户端已经与Zookeeper建立连接后,Watcher的process()方法会被调用,参数是一个表示该连接的时间。
FIFO 队列用 Zookeeper 实现思路如下:
实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。
下面是生产者和消费者这种队列形式的示例代码,完整的代码请看附件:
生产者代码
消费者代码
产品流程图
应用实例
图标解释
app1,app2,app3,app4是4个独立的业务系统
zk1,zk2,zk3是ZooKeeper集群的3个连接点
/queue,是znode的队列,假设队列长度为3
/queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
/queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
/queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
/queue/start,当znode队列中满了,触发创建开始节点
当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束
注:
1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
2). app1可以通过zk2提交,app2也可通过zk3提交
3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!
模拟app1,通过zk1,提交3个请求
创建一个与服务器的连接
出始化队列
增加队列节点
检查队列是否完整
启动函数main
运行结果:
完全符合我的们预期。接下来我们看分布式环境
2). 分布式模拟实验
模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3
注:
1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3
执行app1–>zk1
执行app2–>zk2
执行app3–>zk3
/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!
我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。
下面贴一下完整的代码:
前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。通过这篇文章的分布式队列的案例,你将了解到ZooKeeper的强大。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用
目录
分布式队列
设计思路
程序实现
1. 分布式队列
队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。
2. 设计思路
创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。
队列管理
Zookeeper 可以处理两种类型的队列:
当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。
同步队列用 Zookeeper 实现的实现思路如下:
创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start
的出现,如果已经相等就创建 /synchronizing/start。
用下面的流程图更容易理解:
[align=center]同步队列流程图 [/align]
同步队列的关键代码如下,完整的代码请看附件:
同步队列
void addQueue() throws KeeperException, InterruptedException{ zk.exists(root + "/start",true); zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { List<String> list = zk.getChildren(root, false); if (list.size() < size) { mutex.wait(); } else { zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } }
当队列没满是进入 wait(),然后会一直等待 Watch 的通知,Watch 的代码如下:
public void process(WatchedEvent event) { if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){ System.out.println("得到通知"); super.process(event); doAction(); } }
Watch对象接收来自于Zookeeper的回调,以获得各种事件的通知。 Watcher类的接口只有一个方法:
FIFO 队列用 Zookeeper 实现思路如下:
实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。
下面是生产者和消费者这种队列形式的示例代码,完整的代码请看附件:
生产者代码
boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; }
消费者代码
int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null
产品流程图
应用实例
图标解释
app1,app2,app3,app4是4个独立的业务系统
zk1,zk2,zk3是ZooKeeper集群的3个连接点
/queue,是znode的队列,假设队列长度为3
/queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
/queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
/queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
/queue/start,当znode队列中满了,触发创建开始节点
当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束
注:
1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
2). app1可以通过zk2提交,app2也可通过zk3提交
3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!
3. 程序实现
1). 单节点模拟实验模拟app1,通过zk1,提交3个请求
public static void doOne() throws Exception { String host1 = "192.168.1.201:2181"; ZooKeeper zk = connection(host1); initQueue(zk); joinQueue(zk, 1); joinQueue(zk, 2); joinQueue(zk, 3); zk.close(); }
创建一个与服务器的连接
public static ZooKeeper connection(String host) throws IOException { ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() { // 监听/queue/start创建的事件 public void process(WatchedEvent event) { if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) { System.out.println("Queue has Completed.Finish testing!!!"); } } }); return zk; }
出始化队列
public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException { System.out.println("WATCH => /queue/start"); zk.exists("/queue/start", true); if (zk.exists("/queue", false) == null) { System.out.println("create /queue task-queue"); zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { System.out.println("/queue is exist!"); } }
增加队列节点
public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException { System.out.println("create /queue/x" + x + " x" + x); zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); isCompleted(zk); }
检查队列是否完整
public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException { int size = 3; int length = zk.getChildren("/queue", true).size(); System.out.println("Queue Complete:" + length + "/" + size); if (length >= size) { System.out.println("create /queue/start start"); zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } }
启动函数main
public static void main(String[] args) throws Exception { doOne(); }
运行结果:
WATCH => /queue/start /queue is exist! create /queue/x1 x1 Queue Complete:1/3 create /queue/x2 x2 Queue Complete:2/3 create /queue/x3 x3 Queue Complete:3/3 create /queue/start start Queue has Completed.Finish testing!!!
完全符合我的们预期。接下来我们看分布式环境
2). 分布式模拟实验
模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3
public static void doAction(int client) throws Exception { String host1 = "192.168.1.201:2181"; String host2 = "192.168.1.201:2182"; String host3 = "192.168.1.201:2183"; ZooKeeper zk = null; switch (client) { case 1: zk = connection(host1); initQueue(zk); joinQueue(zk, 1); break; case 2: zk = connection(host2); initQueue(zk); joinQueue(zk, 2); break; case 3: zk = connection(host3); initQueue(zk); joinQueue(zk, 3); break; } }
注:
1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3
执行app1–>zk1
#日志输出 WATCH => /queue/start /queue is exist! create /queue/x1 x1 Queue Complete:1/3 #zookeeper控制台 [zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue [x10000000011]
执行app2–>zk2
#日志输出 WATCH => /queue/start /queue is exist! create /queue/x2 x2 Queue Complete:2/3 #zookeeper控制台 [zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue [x20000000012, x10000000011]
执行app3–>zk3
#日志输出 WATCH => /queue/start /queue is exist! create /queue/x3 x3 Queue Complete:3/3 create /queue/start start Queue has Completed.Finish testing!!! #zookeeper控制台 [zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue [x30000000016, x10000000014, start, x20000000015]
/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!
我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。
下面贴一下完整的代码:
package org.conan.zookeeper.demo;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class QueueZooKeeper {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
doOne();
} else {
doAction(Integer.parseInt(args[0]));
}
}
public static void doOne() throws Exception {
String host1 = "192.168.1.201:2181";
ZooKeeper zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
joinQueue(zk, 2);
joinQueue(zk, 3);
zk.close();
}
public static void doAction(int client) throws Exception { String host1 = "192.168.1.201:2181"; String host2 = "192.168.1.201:2182"; String host3 = "192.168.1.201:2183"; ZooKeeper zk = null; switch (client) { case 1: zk = connection(host1); initQueue(zk); joinQueue(zk, 1); break; case 2: zk = connection(host2); initQueue(zk); joinQueue(zk, 2); break; case 3: zk = connection(host3); initQueue(zk); joinQueue(zk, 3); break; } }
// 创建一个与服务器的连接
public static ZooKeeper connection(String host) throws IOException {
ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
System.out.println("Queue has Completed.Finish testing!!!");
}
}
});
return zk;
}
public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
System.out.println("WATCH => /queue/start");
zk.exists("/queue/start", true);
if (zk.exists("/queue", false) == null) {
System.out.println("create /queue task-queue");
zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("/queue is exist!");
}
}
public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
System.out.println("create /queue/x" + x + " x" + x);
zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
isCompleted(zk);
}
public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
int size = 3;
int length = zk.getChildren("/queue", true).size();
System.out.println("Queue Complete:" + length + "/" + size);
if (length >= size) {
System.out.println("create /queue/start start");
zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
}
相关文章推荐
- 实践:ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- [转载] ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper实现分布式队列Queue
- ZooKeeper分布式队列实现MapReduce任务集成
- Zookeeper分布式队列的实现
- 分布式队列ZooKeeper的实现
- Zookeeper教程(一):快速开始以及结合java实现分布式Barrier和Queue
- Zookeeper教程:快速开始以及结合java实现分布式Barrier和Queue
- 分布式队列ZooKeeper的实现
- ZooKeeper 实现分布式队列
- Zookeeper学习(十):ZooKeeper 实现分布式队列
- zookeeper 实现分布式队列
- ZooKeeper实现分布式队列