您的位置:首页 > 编程语言 > Java开发

Zoookeeper_Java API操作zookeeper 通过zookeeper.jar

2016-11-11 14:14 471 查看
本文讲解下如何通过 zookeeper.jar 操作Zookeeper.

并给出一个例子:例子通过zookeeper记录文件读取的偏移量,当程序重启后,从上一次的断点接着读取。

Maven dependency

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>

封装的Java 操作类

注意一点,Zookeeper.jar 本身是不提供同步连接的, 这里通过 CountDownLatch 将异步连接 转变为 同步连接。

封装类

package com.miaozhen.test.collect;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZookeeperBase implements Watcher {

private static final int SESSION_TIME_OUT = 2000;
private static final String basePath = "/request";
private CountDownLatch countDownLatch = new CountDownLatch(1);
private ZooKeeper zookeeper = null;

@Override
public void process(WatchedEvent event) {
if(event.getState()==KeeperState.SyncConnected){
System.out.println("Watch received event");
countDownLatch.countDown();
}
}

public ZookeeperBase(String host) throws IOException, InterruptedException{
this.zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
}

//==================== 工具函数  ==========================
public String pathChange(String path){
if(path.startsWith(ZookeeperBase.basePath)){
return path;
}else{
return ZookeeperBase.basePath + path;
}
}

//===================== 节点操作函数 ==========================

public Boolean nodeExists(String path) throws KeeperException, InterruptedException{
path = this.pathChange(path);
Stat stat = this.zookeeper.exists(path, false);
return stat == null ? false : true;
}

public Boolean createNode(String path, String data) throws KeeperException, InterruptedException{
path = this.pathChange(path);
if(!this.nodeExists(path)) {
String listPath[] = path.split("/");
String prePath = "";
for(int i=1; i<listPath.length-1; i++){
prePath = prePath + "/" + listPath[i];
if(!this.nodeExists(prePath)){
this.zookeeper.create(prePath, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
}else{
return false;
}
}

public String getData(String path) throws KeeperException, InterruptedException{
path = this.pathChange(path);
if(this.nodeExists(path)) {
return new String(this.zookeeper.getData(path, false, null));
}else{
return null;
}
}

public Boolean setData(String path, String data) throws KeeperException, InterruptedException{
path = this.pathChange(path);
if(this.nodeExists(path)){
this.zookeeper.setData(path, data.getBytes(), -1);
return true;
}else{
return false;
}
}

public Boolean delNode(String path) throws InterruptedException, KeeperException{
path = this.pathChange(path);
if(this.nodeExists(path)){
this.zookeeper.delete(path, -1);
return true;
}else{
return false;
}
}

public List<String> getChilds(String path) throws KeeperException, InterruptedException{
path = this.pathChange(path);
if(this.nodeExists(path)){
return this.zookeeper.getChildren(path, false);
}else{
return null;
}
}

public Integer getChildsNum(String path) throws KeeperException, InterruptedException{
path = this.pathChange(path);
if(this.getChilds(path) == null){
return null;
}else{
return this.getChilds(path).size();
}
}

public void closeConnection() throws InterruptedException{
if(this.zookeeper != null){
zookeeper.close();
}
}
}


例子,通过 Zookeeper记录状态

package com.miaozhen.test.collect;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.KeeperException;

public class SendRequestFromDir {

public static ZookeeperBase zookeeper = null;

//zookeeper 的前缀路径
public static final String preDone = "/done";
public static final String preTodo = "/todo";

public static String requestUrl(String requestAddress){

InputStreamReader inputStreamReader = null;
InputStream inputStream = null;
BufferedReader reader = null;
StringBuffer resultBuffer = new StringBuffer();

try {
URL url = new URL(requestAddress);
URLConnection connection = url.openConnection();
HttpURLConnection httpURLConnection = (HttpURLConnection)connection;

inputStream = httpURLConnection.getInputStream();
inputStreamReader = new InputStreamReader(inputStream,"UTF-8");
reader = new BufferedReader(inputStreamReader);

String tmpLine = null;
while((tmpLine = reader.readLine()) !=null){
resultBuffer.append(tmpLine);
}
} catch (IOException e1) {
e1.printStackTrace();
}
finally{
try {
if (null != reader) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}

return resultBuffer.toString();
}

//从头读到尾
public static void readFileAndSendRequest(String host, String filePath){
SendRequestToCollectServer.readFileAndSendRequest(host, filePath, 0, -1);
}

//读指定的行数
public static void readFileAndSendRequest(String host, String filePath, int start, int end) throws KeeperException, InterruptedException{

File file = new File(filePath);
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
String href = null;
String log = null;
String suffix = null;

//先读取start前面的行,并不做处理
for(int i=0; i<start; i++){
reader.readLine();
}

Boolean infinite = false;
if(end == -1){
infinite = true;
}

int offset = start;
for(int i=0; ((i<=end-start) || infinite) && ((log=reader.readLine())!=null); i++,offset++){

//track-log
String[] arguments = log.split(" - ");
String trackArgument = arguments[3].trim().replaceAll("\"", "");
String[] track_args = trackArgument.split(" ");
suffix = track_args[1];
href = host+suffix;

String result = SendRequestToCollectServer.requestUrl(href);
//TODO :Remove ???
System.out.println(result);

//TODO
//修改zookeeper文件偏移量
System.out.println(SendRequestFromDir.preTodo+filePath+ " : "+offset);
SendRequestFromDir.zookeeper.setData(SendRequestFromDir.preTodo+filePath, String.valueOf(offset));

//每发送1000,休息2秒
//&& (i%1000 == 0)
if(i != 0){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//读到文件结尾
if(infinite && ((log=reader.readLine())==null)){
//TODO
System.out.println("文件读取完毕:" + filePath);

SendRequestFromDir.zookeeper.delNode(SendRequestFromDir.preTodo + filePath);
SendRequestFromDir.zookeeper.createNode(SendRequestFromDir.preDone + filePath, "done");
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (null != reader) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}

}

/**
* 获取指定路径下的所有文件列表
*
* @param dir 要查找的目录
* @return
*/
public static List<String> fileList(String Path){
List<String> fileList = new ArrayList<>();

File pathDir = new File(Path);
if(pathDir.isDirectory()){
File[] files = pathDir.listFiles();
for(File file: files){
if(!file.isDirectory()){
fileList.add(file.getAbsolutePath());
}else{
//递归加载文件夹下的文件
fileList.addAll(SendRequestFromDir.fileList(file.getAbsolutePath()));
}
}
}
return fileList;
}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

if(args.length < 1){
System.out.println("请输入参数");
System.out.println("参数一:日志的目录路径");
System.out.println("参数二:收集日志的地址");
return;
}

String filePath = args[0];
String host = args[1];

//=========================
//=========================

SendRequestFromDir.zookeeper = new ZookeeperBase("10.202.4.22:2181");
try{

List<String> files = SendRequestFromDir.fileList(filePath);
for(int i=0; i<files.size(); i++){
//文件已经读取完毕,跳出本次循环
if(zookeeper.nodeExists(SendRequestFromDir.preDone+files.get(i))){
//TODO
System.out.println("读取完成的文件-->  : "+files.get(i));

continue;
}
//文件开始读
else if(!zookeeper.nodeExists(SendRequestFromDir.preTodo+files.get(i))){
//TODO
System.out.println("zookeeper创建文件节点,开始读取 :"+files.get(i));

zookeeper.createNode(SendRequestFromDir.preTodo+files.get(i), "0");
SendRequestFromDir.readFileAndSendRequest(host, files.get(i), 0, -1);
}
//文件已经开始读,但还未读取完毕
else if(zookeeper.nodeExists(SendRequestFromDir.preTodo+files.get(i))){
String start = zookeeper.getData(SendRequestFromDir.preTodo+files.get(i));
//TODO
System.out.println("文件已经被读取了一部分,继续读取 :"+files.get(i));
System.out.println("上一次的读取偏移量 :"+start);

SendRequestFromDir.readFileAndSendRequest(host, files.get(i), (Integer.valueOf(start)+1), -1);
}
}

}catch(Exception e){
e.printStackTrace();
}finally{
zookeeper.closeConnection();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: