Zoookeeper_Java API操作zookeeper 通过zookeeper.jar
2016-11-11 14:14
471 查看
本文讲解下如何通过 zookeeper.jar 操作Zookeeper.
并给出一个例子:例子通过zookeeper记录文件读取的偏移量,当程序重启后,从上一次的断点接着读取。
Maven dependency
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
封装的Java 操作类
注意一点,Zookeeper.jar 本身是不提供同步连接的, 这里通过 CountDownLatch 将异步连接 转变为 同步连接。
封装类
例子,通过 Zookeeper记录状态
并给出一个例子:例子通过zookeeper记录文件读取的偏移量,当程序重启后,从上一次的断点接着读取。
Maven dependency
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
封装的Java 操作类
注意一点,Zookeeper.jar 本身是不提供同步连接的, 这里通过 CountDownLatch 将异步连接 转变为 同步连接。
封装类
package com.miaozhen.test.collect; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class ZookeeperBase implements Watcher { private static final int SESSION_TIME_OUT = 2000; private static final String basePath = "/request"; private CountDownLatch countDownLatch = new CountDownLatch(1); private ZooKeeper zookeeper = null; @Override public void process(WatchedEvent event) { if(event.getState()==KeeperState.SyncConnected){ System.out.println("Watch received event"); countDownLatch.countDown(); } } public ZookeeperBase(String host) throws IOException, InterruptedException{ this.zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); countDownLatch.await(); } //==================== 工具函数 ========================== public String pathChange(String path){ if(path.startsWith(ZookeeperBase.basePath)){ return path; }else{ return ZookeeperBase.basePath + path; } } //===================== 节点操作函数 ========================== public Boolean nodeExists(String path) throws KeeperException, InterruptedException{ path = this.pathChange(path); Stat stat = this.zookeeper.exists(path, false); return stat == null ? false : true; } public Boolean createNode(String path, String data) throws KeeperException, InterruptedException{ path = this.pathChange(path); if(!this.nodeExists(path)) { String listPath[] = path.split("/"); String prePath = ""; for(int i=1; i<listPath.length-1; i++){ prePath = prePath + "/" + listPath[i]; if(!this.nodeExists(prePath)){ this.zookeeper.create(prePath, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true; }else{ return false; } } public String getData(String path) throws KeeperException, InterruptedException{ path = this.pathChange(path); if(this.nodeExists(path)) { return new String(this.zookeeper.getData(path, false, null)); }else{ return null; } } public Boolean setData(String path, String data) throws KeeperException, InterruptedException{ path = this.pathChange(path); if(this.nodeExists(path)){ this.zookeeper.setData(path, data.getBytes(), -1); return true; }else{ return false; } } public Boolean delNode(String path) throws InterruptedException, KeeperException{ path = this.pathChange(path); if(this.nodeExists(path)){ this.zookeeper.delete(path, -1); return true; }else{ return false; } } public List<String> getChilds(String path) throws KeeperException, InterruptedException{ path = this.pathChange(path); if(this.nodeExists(path)){ return this.zookeeper.getChildren(path, false); }else{ return null; } } public Integer getChildsNum(String path) throws KeeperException, InterruptedException{ path = this.pathChange(path); if(this.getChilds(path) == null){ return null; }else{ return this.getChilds(path).size(); } } public void closeConnection() throws InterruptedException{ if(this.zookeeper != null){ zookeeper.close(); } } }
例子,通过 Zookeeper记录状态
package com.miaozhen.test.collect; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLConnection; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.KeeperException; public class SendRequestFromDir { public static ZookeeperBase zookeeper = null; //zookeeper 的前缀路径 public static final String preDone = "/done"; public static final String preTodo = "/todo"; public static String requestUrl(String requestAddress){ InputStreamReader inputStreamReader = null; InputStream inputStream = null; BufferedReader reader = null; StringBuffer resultBuffer = new StringBuffer(); try { URL url = new URL(requestAddress); URLConnection connection = url.openConnection(); HttpURLConnection httpURLConnection = (HttpURLConnection)connection; inputStream = httpURLConnection.getInputStream(); inputStreamReader = new InputStreamReader(inputStream,"UTF-8"); reader = new BufferedReader(inputStreamReader); String tmpLine = null; while((tmpLine = reader.readLine()) !=null){ resultBuffer.append(tmpLine); } } catch (IOException e1) { e1.printStackTrace(); } finally{ try { if (null != reader) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } return resultBuffer.toString(); } //从头读到尾 public static void readFileAndSendRequest(String host, String filePath){ SendRequestToCollectServer.readFileAndSendRequest(host, filePath, 0, -1); } //读指定的行数 public static void readFileAndSendRequest(String host, String filePath, int start, int end) throws KeeperException, InterruptedException{ File file = new File(filePath); BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8")); String href = null; String log = null; String suffix = null; //先读取start前面的行,并不做处理 for(int i=0; i<start; i++){ reader.readLine(); } Boolean infinite = false; if(end == -1){ infinite = true; } int offset = start; for(int i=0; ((i<=end-start) || infinite) && ((log=reader.readLine())!=null); i++,offset++){ //track-log String[] arguments = log.split(" - "); String trackArgument = arguments[3].trim().replaceAll("\"", ""); String[] track_args = trackArgument.split(" "); suffix = track_args[1]; href = host+suffix; String result = SendRequestToCollectServer.requestUrl(href); //TODO :Remove ??? System.out.println(result); //TODO //修改zookeeper文件偏移量 System.out.println(SendRequestFromDir.preTodo+filePath+ " : "+offset); SendRequestFromDir.zookeeper.setData(SendRequestFromDir.preTodo+filePath, String.valueOf(offset)); //每发送1000,休息2秒 //&& (i%1000 == 0) if(i != 0){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } //读到文件结尾 if(infinite && ((log=reader.readLine())==null)){ //TODO System.out.println("文件读取完毕:" + filePath); SendRequestFromDir.zookeeper.delNode(SendRequestFromDir.preTodo + filePath); SendRequestFromDir.zookeeper.createNode(SendRequestFromDir.preDone + filePath, "done"); } } catch (IOException e) { throw new RuntimeException(e); } finally { try { if (null != reader) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } } /** * 获取指定路径下的所有文件列表 * * @param dir 要查找的目录 * @return */ public static List<String> fileList(String Path){ List<String> fileList = new ArrayList<>(); File pathDir = new File(Path); if(pathDir.isDirectory()){ File[] files = pathDir.listFiles(); for(File file: files){ if(!file.isDirectory()){ fileList.add(file.getAbsolutePath()); }else{ //递归加载文件夹下的文件 fileList.addAll(SendRequestFromDir.fileList(file.getAbsolutePath())); } } } return fileList; } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { if(args.length < 1){ System.out.println("请输入参数"); System.out.println("参数一:日志的目录路径"); System.out.println("参数二:收集日志的地址"); return; } String filePath = args[0]; String host = args[1]; //========================= //========================= SendRequestFromDir.zookeeper = new ZookeeperBase("10.202.4.22:2181"); try{ List<String> files = SendRequestFromDir.fileList(filePath); for(int i=0; i<files.size(); i++){ //文件已经读取完毕,跳出本次循环 if(zookeeper.nodeExists(SendRequestFromDir.preDone+files.get(i))){ //TODO System.out.println("读取完成的文件--> : "+files.get(i)); continue; } //文件开始读 else if(!zookeeper.nodeExists(SendRequestFromDir.preTodo+files.get(i))){ //TODO System.out.println("zookeeper创建文件节点,开始读取 :"+files.get(i)); zookeeper.createNode(SendRequestFromDir.preTodo+files.get(i), "0"); SendRequestFromDir.readFileAndSendRequest(host, files.get(i), 0, -1); } //文件已经开始读,但还未读取完毕 else if(zookeeper.nodeExists(SendRequestFromDir.preTodo+files.get(i))){ String start = zookeeper.getData(SendRequestFromDir.preTodo+files.get(i)); //TODO System.out.println("文件已经被读取了一部分,继续读取 :"+files.get(i)); System.out.println("上一次的读取偏移量 :"+start); SendRequestFromDir.readFileAndSendRequest(host, files.get(i), (Integer.valueOf(start)+1), -1); } } }catch(Exception e){ e.printStackTrace(); }finally{ zookeeper.closeConnection(); } } }
相关文章推荐
- Java代码通过API操作HBase的最佳实践
- Java 通过Poi api操作(read/write)Excel
- zookeeper JavaAPI入门操作
- zookeeper JAVA API 简单操作
- java通过api对hadoop的操作
- 在集群中Java 通过调用API操作HBase 0.98
- hbase编程:通过Java api操作hbase
- Zookeeper学习之源生API的使用(java与shell操作zookeeper)。
- java_api操作zookeeper节点
- linux 下通过过 hbase 的Java api 操作hbase
- 在集群中java 通过调用API操作HBase 0.98
- jxl.jar 通过java操作excel表格的工具类库
- Zookeeper 命令行操作及Java api
- 使用Java客户端API操作Zookeeper
- C#通过Win32 API操作IE浏览器 --- 获得IE的URL
- jxl.jar包简介/java操作excel jxl.jar下载地址
- 通过oracle,调用java类,并加载jar包到oracle中以支持java类。
- 使用JAVA加jxl.jar操作EXECL
- 使用EJB3 Java 持久化API来标准化Java的持久化操作
- 使用EJB3 Java 持久化API来标准化Java的持久化操作