您的位置:首页 > 编程语言 > Java开发

(四)、ZooKeeper Java示例

2016-08-05 12:43 417 查看
利用ZooKeeper简单的创建节点信息。项目使用的是Maven工程。工程结构如下:



POM文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com.lyh.zk</groupId>
<artifactId>ZooKeeper01-javaClient</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>ZooKeeper01-javaClient</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-test</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
创建ZooKeeper客户端,MyZooKeeper实现Watcher接口。

package com.lyh.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

/**
* ZooKeeper监听
* @author liuyuehu
*/
public class MyZooKeeper implements Watcher {

Logger logger = Logger.getLogger(MyZooKeeper.class);

protected CountDownLatch countDownLatch = new CountDownLatch(1);
//缓存时间
private static final int SESSION_TIME = 2000;
public static ZooKeeper zooKeeper = null;

/**
* 监控所有被触发的事件
*/
public void process(WatchedEvent event) {
logger.info("收到事件通知:" + event.getState() );
if(event.getState()==KeeperState.SyncConnected){
countDownLatch.countDown();
}
}

/**
* <p>连接Zookeeper</p>
* <pre>
*     [关于connectString服务器地址配置]
*     格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
*     这个地址配置有多个ip:port之间逗号分隔,底层操作
*     ConnectStringParser connectStringParser =  new ConnectStringParser(“192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181”);
*     这个类主要就是解析传入地址列表字符串,将其它保存在一个ArrayList中
*     ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
*     接下去,这个地址列表会被进一步封装成StaticHostProvider对象,并且在运行过程中,一直是这个对象来维护整个地址列表。
*     ZK客户端将所有Server保存在一个List中,然后随机打乱(这个随机过程是一次性的),并且形成一个环,具体使用的时候,从0号位开始一个一个使用。
*     因此,Server地址能够重复配置,这样能够弥补客户端无法设置Server权重的缺陷,但是也会加大风险。
*     [客户端和服务端会话说明]
*     ZooKeeper中,客户端和服务端建立连接后,会话随之建立,生成一个全局唯一的会话ID(Session ID)。
*     服务器和客户端之间维持的是一个长连接,在SESSION_TIMEOUT时间内,服务器会确定客户端是否正常连接(客户端会定时向服务器发送heart_beat,服务器重置下次SESSION_TIMEOUT时间)。
*     因此,在正常情况下,Session一直有效,并且ZK集群所有机器上都保存这个Session信息。
*     在出现网络或其它问题情况下(例如客户端所连接的那台ZK机器挂了,或是其它原因的网络闪断),客户端与当前连接的那台服务器之间连接断了,
*     这个时候客户端会主动在地址列表(实例化ZK对象的时候传入构造方法的那个参数connectString)中选择新的地址进行连接。
*     [会话时间]
*     客户端并不是可以随意设置这个会话超时时间,在ZK服务器端对会话超时时间是有限制的,主要是minSessionTimeout和maxSessionTimeout这两个参数设置的。
*     如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。 默认的Session超时时间是在2 * tickTime ~ 20 * tickTime
* </pre>
* @param connectString  Zookeeper服务地址
* @param sessionTimeout Zookeeper连接超时时间
*/
public void connect(String hosts){
try {
if(zooKeeper == null){
// ZK客户端允许我们将ZK服务器的所有地址都配置在这里
zooKeeper = new ZooKeeper(hosts,SESSION_TIME,this);
// 使用CountDownLatch.await()的线程(当前线程)阻塞直到所有其它拥有
//CountDownLatch的线程执行完毕(countDown()结果为0)
countDownLatch.await();
}
} catch (IOException e) {
logger.error("连接创建失败,发生 InterruptedException , e " + e.getMessage(), e);
} catch (InterruptedException e) {
logger.error( "连接创建失败,发生 IOException , e " + e.getMessage(), e );
}
}

/**
* 关闭连接
*/
public void close(){
try {
if(zooKeeper != null){
zooKeeper.close();
}
} catch (InterruptedException e) {
logger.error("release connection error ," + e.getMessage() ,e);
}
}
}
创建一个类,ZKOpeate集中操作ZooKeeper的节点信息。

package com.lyh.zk;

import java.util.List;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class ZKOperate {

Logger logger = Logger.getLogger(MyZooKeeper.class);

MyZooKeeper myZooKeeper = new MyZooKeeper();

/**
* <p>创建zNode节点, String create(path<节点路径>, data[]<节点内容>, List(ACL访问控制列表), CreateMode<zNode创建类型>) </p><br/>
* <pre>
*     节点创建类型(CreateMode)
*     1、PERSISTENT:持久化节点
*     2、PERSISTENT_SEQUENTIAL:顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
*     3、EPHEMERAL:临时节点客户端,session超时这类节点就会被自动删除
*     4、EPHEMERAL_SEQUENTIAL:临时自动编号节点
* </pre>
* @param path zNode节点路径
* @param data zNode数据内容
* @return 创建成功返回true, 反之返回false.
*/
public boolean createZNode(String path,String data){
try {
String zkPath = MyZooKeeper.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info("ZooKeeper创建节点成功,节点地址:" + zkPath);
return true;
} catch (KeeperException e) {
logger.error("创建节点失败:" + e.getMessage() + ",path:" + path  ,e);
} catch (InterruptedException e) {
logger.error("创建节点失败:" + e.getMessage() + ",path:" + path  ,e);
}
return false;
}

/**
* <p>删除一个zMode节点, void delete(path<节点路径>, stat<数据版本号>)</p><br/>
* <pre>
*     说明
*     1、版本号不一致,无法进行数据删除操作.
*     2、如果版本号与znode的版本号不一致,将无法删除,是一种乐观加锁机制;如果将版本号设置为-1,不会去检测版本,直接删除.
* </pre>
* @param path zNode节点路径
* @return 删除成功返回true,反之返回false.
*/
public boolean deteleZNode(String path){
try {
MyZooKeeper.zooKeeper.delete(path, -1);
logger.info("ZooKeeper删除节点成功,节点地址:" + path);
return true;
} catch (InterruptedException e) {
logger.error("删除节点失败:" + e.getMessage() + ",path:" + path ,e);
} catch (KeeperException e) {
logger.error("删除节点失败:" + e.getMessage() + ",path:" + path  ,e);
}
return false;
}

/**
* <p>更新指定节点数据内容, Stat setData(path<节点路径>, data[]<节点内容>, stat<数据版本号>)</p>
* <pre>
*     设置某个znode上的数据时如果为-1,跳过版本检查
* </pre>
* @param path zNode节点路径
* @param data zNode数据内容
* @return 更新成功返回true,返回返回false
*/
public boolean updateZNodeData(String path,String data){
try {
Stat stat = MyZooKeeper.zooKeeper.setData(path, data.getBytes(), -1);
logger.info( "更新数据成功, path:" + path + ", stat: " + stat );
return true;
} catch (KeeperException e) {
logger.error("更新节点数据失败:" + e.getMessage() + ",path:" + path ,e);
} catch (InterruptedException e) {
logger.error("更新节点数据失败:" + e.getMessage() + ",path:" + path ,e);
}
return false;
}

/**
* <p>读取指定节点数据内容,byte[] getData(path<节点路径>, watcher<监视器>, stat<数据版本号>)</p>
* @param path zNode节点路径
* @return 节点存储的值,有值返回,无值返回null
*/
public String readData( String path ){
String data = null;
try {
data = new String( MyZooKeeper.zooKeeper.getData( path, false, null ) );
logger.info( "读取数据成功, path:" + path + ", content:" + data);
} catch (KeeperException e) {
logger.error( "读取数据失败,发生KeeperException! path: " + path
+ ", errMsg:" + e.getMessage(), e );
} catch (InterruptedException e) {
logger.error( "读取数据失败,发生InterruptedException! path: " + path
+ ", errMsg:" + e.getMessage(), e );
}
return  data;
}

/**
* <p>获取某个节点下的所有子节点,List getChildren(path<节点路径>, watcher<监视器>)该方法有多个重载</p>
* @param path zNode节点路径
* @return 子节点路径集合 说明,这里返回的值为节点名
* <pre>
*     eg.
*     /node
*     /node/child1
*     /node/child2
*     getChild( "node" )户的集合中的值为["child1","child2"]
* </pre>
* @throws KeeperException
* @throws InterruptedException
*/
public List<String> getChild( String path ){
try{
List<String> list=MyZooKeeper.zooKeeper.getChildren( path, false );
if(list.isEmpty()){
logger.info( "中没有节点" + path );
}
return list;
}catch (KeeperException e) {
logger.error( "读取子节点数据失败,发生KeeperException! path: " + path
+ ", errMsg:" + e.getMessage(), e );
} catch (InterruptedException e) {
logger.error( "读取子节点数据失败,发生InterruptedException! path: " + path
+ ", errMsg:" + e.getMessage(), e );
}
return null;
}

/**
* <p>判断某个zNode节点是否存在, Stat exists(path<节点路径>, watch<并设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper 实例时指定的 watcher>)</p>
* @param path zNode节点路径
* @return 存在返回true,反之返回false
*/
public boolean isExists( String path ){
try {
Stat stat = MyZooKeeper.zooKeeper.exists( path, false );
return null != stat;
} catch (KeeperException e) {
logger.error( "读取数据失败,发生KeeperException! path: " + path
+ ", errMsg:" + e.getMessage(), e );
} catch (InterruptedException e) {
logger.error( "读取数据失败,发生InterruptedException! path: " + path
+ ", errMsg:" + e.getMessage(), e );
}
return false;
}
}
客户端测试:ZKClientTest.java

package com.lyh.zk;

import java.util.List;

/**
* 客户端测试
* @author liuyuehu
*/
public class ZKClientTest {

public static void main(String[] args) {
// 定义父子类节点路径
String rootPath = "/TestZookeeper";
String child1Path = rootPath + "/hello1";
String child2Path = rootPath + "/word1";

//ZKOperate操作API
ZKOperate zkWatchAPI = new ZKOperate();

// 连接zk服务器
MyZooKeeper zooKeeper = new MyZooKeeper();
zooKeeper.connect("127.0.0.1:2181");

// 创建节点数据
if ( zkWatchAPI.createZNode(rootPath, "<父>节点数据" ) ) {
System.out.println( "节点[" + rootPath + "]数据内容[" + zkWatchAPI.readData( rootPath ) + "]" );
}
// 创建子节点, 读取 + 删除
if ( zkWatchAPI.createZNode( child1Path, "<父-子(1)>节点数据" ) ) {
System.out.println( "节点[" + child1Path + "]数据内容[" + zkWatchAPI.readData( child1Path ) + "]" );
zkWatchAPI.deteleZNode(child1Path);
System.out.println( "节点[" + child1Path + "]删除值后[" + zkWatchAPI.readData( child1Path ) + "]" );
}

// 创建子节点, 读取 + 修改
if ( zkWatchAPI.createZNode(child2Path, "<父-子(2)>节点数据" ) ) {
System.out.println( "节点[" + child2Path + "]数据内容[" + zkWatchAPI.readData( child2Path ) + "]" );
zkWatchAPI.updateZNodeData(child2Path, "<父-子(2)>节点数据,更新后的数据" );
System.out.println( "节点[" + child2Path+ "]数据内容更新后[" + zkWatchAPI.readData( child2Path ) + "]" );
}

// 获取子节点
List<String> childPaths = zkWatchAPI.getChild(rootPath);
if(null != childPaths){
System.out.println( "节点[" + rootPath + "]下的子节点数[" + childPaths.size() + "]" );
for(String childPath : childPaths){
System.out.println(" |--节点名[" +  childPath +  "]");
}
}
// 判断节点是否存在
System.out.println( "检测节点[" + rootPath + "]是否存在:" + zkWatchAPI.isExists(rootPath)  );
System.out.println( "检测节点[" + child1Path + "]是否存在:" + zkWatchAPI.isExists(child1Path)  );
System.out.println( "检测节点[" + child2Path + "]是否存在:" + zkWatchAPI.isExists(child2Path)  );

zooKeeper.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息