您的位置:首页 > 其它

zookeeper应用案例——服务器列表动态更新

2017-05-17 11:21 281 查看
需要的jar包:

zookeeper-3.4.5\zookeeper-3.4.5.jar

zookeeper-3.4.5\lib\所有jar包



DistributedServer.java:

public class DistributedServer {
//先在zookeeper服务器上创建一个/servers节点
private static final String groupNode = "/servers";

public static void waitUntilConnected(ZooKeeper testZooKeeper, CountDownLatch testLatch) {
if(testZooKeeper.getState() == States.CONNECTING) {
try {
testLatch.await();
} catch (InterruptedException err) {
System.out.println("Latch exception");
}
}
}

static class ConnectedWatcher implements Watcher {
private CountDownLatch connectedLatch;
ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;  /* CountDownLatch实例初始化时设为1即可 */
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown(); /* ZK连接成功时,计数器由1减为0 */
}
}
}

/**
* 用于向zookeeper集群注册本服务器节点的信息
*/
public void registerZK(String hostname) throws Exception{
CountDownLatch sampleLatch = new CountDownLatch(1);
Watcher sampleWatcher = new ConnectedWatcher (sampleLatch);
//创建一个zk客户端,定义一个监听器逻辑
ZooKeeper zkCli = new ZooKeeper("192.168.77.70:2181", 2000, sampleWatcher);

/* 只有当zkCli链接成功(状态为 SyncConnected)时,此函数调用才结束 */
waitUntilConnected(zkCli, sampleLatch);
/*接下来就可以继续zkCli访问了,避免因为zkCli未连接成功时的访问出错 */

//利用zk往zookeeper中创建一个临时znode   "/servers/server-randomid"
String path = zkCli.create(groupNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println("服务器"+hostname +"在zookeeper中注册了一个子节点: "+ path);
}

/**
* 模拟服务端的业务处理功能
*/
public void handle(String hostname) throws InterruptedException{
System.out.println("----服务器---" +hostname +"开始处理自己的业务了.......");
Thread.sleep(Long.MAX_VALUE);
}

public static void main(String[] args) throws Exception {
//调用registerZK方法往zk注册服务器信息
DistributedServer server = new DistributedServer();
server.registerZK(args[0]);

//处理自己的业务功能
server.handle(args[0]);
}
}


DistributedClient.java:

public class DistributedClient {
static ZooKeeper zkCli = null;
volatile static List<String> servers = null;

public static void waitUntilConnected(ZooKeeper testZooKeeper, CountDownLatch testLatch) {
if(testZooKeeper.getState() == States.CONNECTING) {
try {
testLatch.await();
} catch (InterruptedException err) {
System.out.println("Latch exception");
}
}
}

static class ConnectedWatcher implements Watcher {
private CountDownLatch connectedLatch;
ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;  /* CountDownLatch实例初始化时设为1即可 */
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown(); /* ZK连接成功时,计数器由1减为0 */
}
// 从zookeeper中获取最新的服务节点信息
try {
updateServers();
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 获取在线的服务节点
*/
public void getOnlineServers() throws Exception {
CountDownLatch sampleLatch = new CountDownLatch(1);
Watcher sampleWatcher = new ConnectedWatcher (sampleLatch);
// 构造一个zookeeper的客户端
zkCli = new ZooKeeper("192.168.77.70:2181", 2000, sampleWatcher);

/* 只有当zkCli链接成功(状态为 SyncConnected)时,此函数调用才结束 */
waitUntilConnected(zkCli, sampleLatch);
/*接下来就可以继续zkCli访问了,避免因为zkCli未连接成功时的访问出错 */

// 一进来就调用updateServers方法获取当前在线的服务节点
updateServers();
}

/**
* 从zookeeper中获取服务节点信息的具体实现方法
*/
public static void updateServers() throws Exception {
// 构造一个list用来保存服务节点信息
List<String> serverList = new ArrayList<String>();

// 先拿到servers下的子节点名称列表,并对父节点servers注册监听器
List<String> children = zkCli.getChildren("/servers", true);
//		 遍历获取每一个子节点所保存的数据——服务节点信息
for (String child : children) {
byte[] data = zkCli.getData("/servers/" + child, false, null);
String serverName = new String(data, "utf-8");
serverList.add(serverName);

System.out.println("当前在线的服务节点有: " + serverName);
}
servers = serverList;
System.out.println("---------服务节点信息更新完毕---------");
}

/**
* 模拟客户端程序的业务功能
*/
public void handle() throws Exception {
System.out.println("客户端开始处理自己的业务功能.......");
Thread.sleep(Long.MAX_VALUE);
}

public static void main(String[] args) throws Exception {
// 获取服务器列表
DistributedClient distributedClient = new DistributedClient();
distributedClient.getOnlineServers();

// 处理自己的业务功能
distributedClient.handle();
}
}


结果:

(1)先启动DistributedClient.java,然后再陆续启动三个DistributedServer.java,观察DistributedClient.java上的结果









(2)陆续关闭三个DistributedServer.java,观察DistributedClient.java上的结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息