java实时监听日志写入kafka(多目录)
2014-04-06 20:15
453 查看
目的
实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)源码
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.LineNumberReader; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; ; public class XTail_Line { public static class TailFileThread extends Thread { File file; LineNumberReader randomFile=null; String newfile=null; String thisfile=null; String prefile=null; private long lastTimeFileSize = 0; private String drname=null; int ln=0; int beginln=0; private Producer<String,String> inner; java.util.Random ran = new Random(); String topicname=null; public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException { file=new File(path); this.drname=drname; this.topicname=topicname; Properties properties = new Properties(); // properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); properties.load(new FileInputStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } // KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); //随机作为key,hash分散到各个分区 KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message); // KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message); inner.send(km); } public void send(String topicName,Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } public String getNewFile(File file) { File[] fs=file.listFiles(); long maxtime=0; String newfilename=""; for (int i=0;i<fs.length;i++) { if (fs[i].isFile()&&fs[i].lastModified()>maxtime) { maxtime=fs[i].lastModified(); newfilename=fs[i].getAbsolutePath(); } } return newfilename; } //写入文件名及行号 public void writePosition(String path,int rn) { try { BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position")); out.write(path+","+rn); out.close(); } catch (IOException e) { } } public void run() { thisfile=getNewFile(file); prefile=thisfile; //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件 try { BufferedReader br=new BufferedReader(new FileReader(drname+".position")); String line=br.readLine(); if (line!=null &&line.contains(",")) { thisfile=line.split(",")[0]; prefile=thisfile; beginln=Integer.parseInt(line.split(",")[1]); } } catch (FileNotFoundException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } catch (IOException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } //指定文件可读可写 try { randomFile = new LineNumberReader(new FileReader(thisfile)); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } while (true) { try { Thread.sleep(100); //调用interrupt方法后 if(isInterrupted()) { System.out.println("Interrupted..."); break; } } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { //获得变化部分的 // randomFile.seek(lastTimeFileSize); String tmp = ""; while( (tmp = randomFile.readLine())!= null) { int currln=randomFile.getLineNumber(); //beginln默认为0 if (currln>beginln) send(topicname,new String(tmp.getBytes("utf8"))); ln++; //每发生一条写一次影响效率 if (ln>100) { writePosition(thisfile,currln); ln=0; } } thisfile=getNewFile(file); if(!thisfile.equals(prefile)) { randomFile.close(); randomFile = new LineNumberReader(new FileReader(thisfile)); prefile=thisfile; beginln=0; } } catch (IOException e) { throw new RuntimeException(e); } } } } public static void main(String[] args) throws Exception { /* LogView view = new LogView(); final File tmpLogFile = new File("D:\\test.txt"); view.realtimeShowLog(tmpLogFile); */ if (args.length!=2) { System.out.println("usage:topicname pathname"); System.exit(1); } String topicname=args[0]; String pathname=args[1]; HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>(); File tmpLogFile = new File(pathname); File[] fs=tmpLogFile.listFiles(); while (true) { fs=tmpLogFile.listFiles(); for (int i=0;i<fs.length;i++) { if(fs[i].isDirectory()) { String path=fs[i].getAbsolutePath(); //以drname作为position文件名 String drname=fs[i].getName(); //如果该目录对应的处理线程已经存在,判断是否存活 if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa") ) { if (hm.containsKey(path)) { if (!hm.get(path).isAlive()) { hm.get(path).interrupt(); TailFileThread tt=new TailFileThread(path,drname,topicname); tt.start(); hm.put(path, tt); } } //如果不存在,新建 else { TailFileThread tt=new TailFileThread(path,drname,topicname); tt.start(); hm.put(path, tt); } } } //System.out.println(fs[i].getAbsolutePath()); } Thread.sleep(100); } } }
相关文章推荐
- java实时监听日志写入kafka(多目录)
- java实时监听日志写入kafka(转)
- java实时监听日志写入kafka
- java实时监听日志写入kafka
- java实时监听日志写入kafka(转)
- 使用Log4j将程序日志实时写入Kafka
- 使用Log4j将程序日志实时写入Kafka
- Java实现系统目录实时监听更新。
- Flume读取日志数据并写入到Kafka,ConsoleConsumer进行实时消费
- 使用Log4j将程序日志实时写入Kafka(转)
- 实时监听文件写入kafka
- Java实现系统目录实时监听更新。
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码
- 将Log4j日志实时写入Kafka
- 使用Log4j将程序日志实时写入Kafka
- 利用Maxwell组件实时监听Mysql的binlog日志,并且把解析的json格式数据发送到kafka窗口供实时消费
- 使用Log4j将程序日志实时写入Kafka
- flume读取日志数据写入kafka 然后kafka+storm整合
- 【Java】【日志】大型开源日志系统比较 Flume Scribe Chukwa Kafka