您的位置:首页 > 编程语言 > Java开发

zookeeper的初体验

2016-05-30 00:00 357 查看
摘要: zookeeper是一个分布式的,开放源码的分布式应用程序协调服务。它是一个为分布式应用提供一致性服务的软件。

zookeeper的初体验

很多技术文章或者活动喜欢扯大数据,一言不合就说什么鬼东西是大数据,大数据分析,大数据。。。听的耳朵都磨出茧子来了,有时候谈到大数据就会想到分布式部署进行数据分析,组成一个集群,或者若干个集群,其中不管是那些扯淡的大数据也好,都或多或少会用到zookeeper或者类似的做基础,也就是大厦的地基。

废话不多少,先写点我自己了解的基础的知识和zookeeper的一些基本原理,后面再上代码实践一番zookeeper的特性。

zookeeper是什么鬼

zookeeper是一个分布式的,开放源码的分布式应用程序协调服务。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

详情:https://zookeeper.apache.org http://www.yuorfei.com/article?id=27

zookeeper的特点

Zookeeper通过一种和文件系统很像的层级命名空间来让分布式进程互相协同工作。这些命名空间由一系列数据寄存器组成,我们也叫这些数据寄存器为znode。这些znode就有点像是文件系统中的文件和文件夹。和文件系统不一样的是,文件系统的文件是存储在存储区上的,而zookeeper的数据是存储在内存上的。同时,这就意味着zookeeper有着高吞吐和低延迟。说的直白一点就是znode这玩意儿是一个跟Unix文件系统路径相似的节点的东西。ZooKeeper是以FastPaxos算法为基础的,数据一致性方法可以通过共享内存(需要锁)或者消息传递实现,Paxos 算法采用消息传递来实现数据的一致性。Paxos算法存在活锁的问题,即当有多个proposer交错提交时,有可能互相排斥导致没有一个proposer能提交成功,而FastPaxos作了一些优化,通过选举产生一个leader,只有leader才能提交proposer,具体算法可见Fast Paxos。其实我本人对这个算法没有深入了解过,只知道有这么个东西。

zookeeper的基本运转流程

1、选举Leader。

2、同步数据。

3、选举Leader过程中算法有很多,但要达到的选举标准是一致的。

4、集群中大多数的机器得到响应并follow选出的Leader。

zookeeper的使用场景

1、数据发布与订阅,系统将数据发布到zk节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,地址列表等就非常适合使用。

2、分布式命名服务,能过调用zk的create api,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。

3、分布通知/协调,ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理。

4、分布式锁,ZooKeeper为我们保证了强一致性,即用户只要完全信任每时每刻,zk集群中任意节点(一个zk server)上的相同znode的数据是一定是相同的。把zk上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。

5、集群管理,这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化响应。这样的场景中,通常有一个监控系统,实时检测集群机器是否存活。

6、分布式队列,有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。后者通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。

zookeeper的配置

zookeeper下载下来后需要修改一下配置文件,下面是我的配置

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/vagrant/zookeeper/zookeeper-3.4.8/temp
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance #
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#server.A=www.yuorfei.com:C:D
server.1=192.168.1.215:2888:3888
server.2=192.168.1.216:2888:3888
server.3=192.168.1.217:2888:3888

tickTime :基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

dataDir :存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

clientPort :这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

initLimit:这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数,当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒。

syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒

server.A = B:C:D : A表示这个是第几号服务器,B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader

我配置了三台机器,因为zookeeper最少需要三台机器才能启动,启动后途中最少需要存活两台机器,否则它没法进行选举导致报错。启动zookeeper的时候最好是奇数倍的机器启动。

启动zookeeper:进入每一台机器的zookeeper的bin目录,启动:

./zkServer.sh start

停止:

./zkServer.sh stop


随便找一台集群的机器

./zkCli.sh

可以查看zookeeper的运行情况和节点情况,具体的命令可以去看官方的文档,和操作linux差不多。

上代码

1.先来一段操作zookeeper的znode节点实现增删改。

package zookeeper1;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
* Created by hxy on 16/4/16.
*/
public class BasicDemo1 {

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 创建一个与服务器的连接
ZooKeeper zk = new ZooKeeper("192.168.1.215:2181", 60000, new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
System.out.println("EVENT:" + event.getType());
}
});

// 查看根节点
System.out.println("ls / => " + zk.getChildren("/", true));

// 创建一个目录节点
if (zk.exists("/node", true) == null) {
zk.create("/node", "conan".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("create /node conan");
// 查看/node节点数据
System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
// 查看根节点
System.out.println("ls / => " + zk.getChildren("/", true));
}

// 创建一个子目录节点
if (zk.exists("/node/sub1", true) == null) {
zk.create("/node/sub1", "sub1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("create /node/sub1 sub1");
// 查看node节点
System.out.println("ls /node => " + zk.getChildren("/node", true));
}

// 修改节点数据
if (zk.exists("/node", true) != null) {
zk.setData("/node", "changed".getBytes(), -1);
// 查看/node节点数据
System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
}

// 删除节点
if (zk.exists("/node/sub1", true) != null) {
zk.delete("/node/sub1", -1);
zk.delete("/node", -1);
// 查看根节点
System.out.println("ls / => " + zk.getChildren("/", true));
}

// 关闭连接
zk.close();
}

}


2.上面是对zookeeper的znode节点进行操作的简单,下面来一个多生产者生产,多消费者消费的例子,zookeeper做负载均衡。

注:写这个例子的时候为了快点实现,我省略掉了监控节点状态的变化情况,如:某节点挂掉,重新进行选举。

生产者:

package zookeeper2;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

/**
* Created by hxy on 16/5/10.
*/
public class AppServer {

private final static String TEST_NODE_PATH = "/test1";
private final static long SLEEP_TIME = 100;
private final static String zookeeperServer = "192.168.1.215:2181,192.168.1.216:2181,192.168.1.217:2181";
private ZooKeeper zooKeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);

//如果版本号与znode的版本号不一致,将无法删除,是一种乐观加锁机制;如果将版本号设置为-1,不会去检测版本,直接删除;
private final static int VERSION = -1;

/**
* 连接zookeeper
*/
public AppServer connectZookeeper() throws Exception {
zooKeeper = new ZooKeeper(zookeeperServer, 5000, new Watcher() {
public void process(WatchedEvent event) {
if (Event.KeeperState.Disconnected == event.getState()) {
System.out.println(" event state: " + event.getState() + "  path: " + event.getPath());
connectedSemaphore.countDown();
}
}
});
return this;
}

public AppServer initNodeData() throws KeeperException, InterruptedException {
if (zooKeeper.exists(TEST_NODE_PATH, true) != null) {
zooKeeper.delete(TEST_NODE_PATH, VERSION);
zooKeeper.create(TEST_NODE_PATH, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zooKeeper.create(TEST_NODE_PATH, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
System.out.println(" server init node data:0");
return this;
}

public void start() {
System.out.println("the server is start...");
new Thread(new NodeTaskThread()).start();
}

public static void main(String[] args) throws Exception {
AppServer appServer = new AppServer();
appServer.connectZookeeper().initNodeData().start();
}

private class NodeTaskThread implements Runnable {

private boolean writeData(String node) throws KeeperException, InterruptedException {
if (zooKeeper == null || node == null || node.length() <= 0) {
return false;
}
//判断节点是否存在
if (zooKeeper.exists(node, true) != null) {
byte[] bytes = zooKeeper.getData(node, false, null);
String dataString = new String(bytes);
long dataLong = Long.valueOf(dataString) + 1;
dataString = String.valueOf(dataLong);
zooKeeper.setData(node, dataString.getBytes(), VERSION);
System.out.println(" server write data:" + dataLong);
} else {
zooKeeper.create(node, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
return true;
}

public void run() {
while (true) {
try {
writeData(TEST_NODE_PATH);

} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(SLEEP_TIME);  //10秒钟休息再来换uuid
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}


消费者:

package zookeeper2;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

/**
* Created by hxy on 16/5/10.
*/
public class AppClient {
private final static String TEST_NODE_PATH = "/test1";
private final static long SLEEP_TIME = 500;
private final static String zookeeperServer = "192.168.1.215:2181,192.168.1.216:2181,192.168.1.217:2181";
private ZooKeeper zooKeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);

private final static int VERSION = -1;

/**
* 连接zookeeper
*/
public AppClient connectZookeeper() throws Exception {
zooKeeper = new ZooKeeper(zookeeperServer, 5000, new Watcher() {
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
});
return this;
}

public void start() {
System.out.println("the client is start...");
new Thread(new NodeListenThread()).start();
}

public static void main(String[] args) throws Exception {
AppClient ac = new AppClient();
ac.connectZookeeper().start();
}

private class NodeListenThread implements Runnable {

private boolean showDate(String node) throws KeeperException, InterruptedException {
if (zooKeeper == null || node == null || node.length() <= 0) {
System.out.println("find null , fuck ");
return false;
}
//判断节点是否存在
if (zooKeeper.exists(node, true) != null) {
byte[] bytes = zooKeeper.getData(node, false, null);
if (bytes == null || bytes.length <= 0) {
System.out.println("data is null,fuck");
} else {
String dataString = new String(bytes);
long dataLong = Long.valueOf(dataString);
if (dataLong <= 0) {
System.out.println(" count <= 0  sleep  ");
Thread.sleep(SLEEP_TIME);
} else {
dataLong = dataLong - 1;
dataString = String.valueOf(dataLong);
zooKeeper.setData(node, dataString.getBytes(), VERSION);
System.out.println(" count:" + dataLong);
}
}
} else {
// 节点不存在
System.out.println("node:'" + node + "' does not exist. ");
}
return true;
}

public void run() {
while (true) {
try {
Thread.sleep(SLEEP_TIME);  //休息一会儿再拿数据
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
showDate(TEST_NODE_PATH);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}


总结:

zookeeper的用处远远不止于我以上写的那样,官方也没有说zookeeper一定要用于什么用途,说白了zookeeper就是一堵可以画画的墙,特性摆在这里,至于要画什么东西要看每个人需要它的特性做什么。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息