您的位置:首页 > 编程语言 > Java开发

java实时监听日志写入kafka(多目录)

2017-05-25 17:51 381 查看

目的

实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

源码

[java] view plain copy

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.io.FileWriter;

import java.io.IOException;

import java.io.LineNumberReader;

import java.util.ArrayList;

import java.util.Collection;

import java.util.HashMap;

import java.util.List;

import java.util.Properties;

import java.util.Random;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

;

public class XTail_Line {

public static class TailFileThread extends Thread

{

File file;

LineNumberReader randomFile=null;

String newfile=null;

String thisfile=null;

String prefile=null;

private long lastTimeFileSize = 0;

private String drname=null;

int ln=0;

int beginln=0;

private Producer<String,String> inner;

java.util.Random ran = new Random();

String topicname=null;

public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException

{

file=new File(path);

this.drname=drname;

this.topicname=topicname;

Properties properties = new Properties();

// properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));

properties.load(new FileInputStream("producer.properties"));

ProducerConfig config = new ProducerConfig(properties);

inner = new Producer<String, String>(config);

}

public void send(String topicName,String message) {

if(topicName == null || message == null){

return;

}

// KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);

//随机作为key,hash分散到各个分区

KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);

// KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);

inner.send(km);

}

public void send(String topicName,Collection<String> messages) {

if(topicName == null || messages == null){

return;

}

if(messages.isEmpty()){

return;

}

List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();

for(String entry : messages){

KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);

kms.add(km);

}

inner.send(kms);

}

public void close(){

inner.close();

}

public String getNewFile(File file)

{

File[] fs=file.listFiles();

long maxtime=0;

String newfilename="";

for (int i=0;i<fs.length;i++)

{

if (fs[i].isFile()&&fs[i].lastModified()>maxtime)

{

maxtime=fs[i].lastModified();

newfilename=fs[i].getAbsolutePath();

}

}

return newfilename;

}

//写入文件名及行号

public void writePosition(String path,int rn)

{

try {

BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position"));

out.write(path+","+rn);

out.close();

} catch (IOException e) {

}

}

public void run()

{

thisfile=getNewFile(file);

prefile=thisfile;

//访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件

try {

BufferedReader br=new BufferedReader(new FileReader(drname+".position"));

String line=br.readLine();

if (line!=null &&line.contains(","))

{

thisfile=line.split(",")[0];

prefile=thisfile;

beginln=Integer.parseInt(line.split(",")[1]);

}

} catch (FileNotFoundException e2) {

// TODO Auto-generated catch block

e2.printStackTrace();

}

catch (IOException e2) {

// TODO Auto-generated catch block

e2.printStackTrace();

}

//指定文件可读可写

try {

randomFile = new LineNumberReader(new FileReader(thisfile));

} catch (FileNotFoundException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

while (true)

{

try {

Thread.sleep(100);

//调用interrupt方法后

if(isInterrupted())

{

System.out.println("Interrupted...");

break;

}

} catch (InterruptedException e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}

try {

//获得变化部分的

// randomFile.seek(lastTimeFileSize);

String tmp = "";

while( (tmp = randomFile.readLine())!= null) {

int currln=randomFile.getLineNumber();

//beginln默认为0

if (currln>beginln)

send(topicname,new String(tmp.getBytes("utf8")));

ln++;

//每发生一条写一次影响效率

if (ln>100)

{

writePosition(thisfile,currln);

ln=0;

}

}

thisfile=getNewFile(file);

if(!thisfile.equals(prefile))

{

randomFile.close();

randomFile = new LineNumberReader(new FileReader(thisfile));

prefile=thisfile;

beginln=0;

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

}

public static void main(String[] args) throws Exception {

/*

LogView view = new LogView();

final File tmpLogFile = new File("D:\\test.txt");

view.realtimeShowLog(tmpLogFile);

*/

if (args.length!=2)

{

System.out.println("usage:topicname pathname");

System.exit(1);

}

String topicname=args[0];

String pathname=args[1];

HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>();

File tmpLogFile = new File(pathname);

File[] fs=tmpLogFile.listFiles();

while (true)

{

fs=tmpLogFile.listFiles();

for (int i=0;i<fs.length;i++)

{

if(fs[i].isDirectory())

{

String path=fs[i].getAbsolutePath();

//以drname作为position文件名

String drname=fs[i].getName();

//如果该目录对应的处理线程已经存在,判断是否存活

if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa")

)

{

if (hm.containsKey(path))

{

if (!hm.get(path).isAlive())

{

hm.get(path).interrupt();

TailFileThread tt=new TailFileThread(path,drname,topicname);

tt.start();

hm.put(path, tt);

}

}

//如果不存在,新建

else

{

TailFileThread tt=new TailFileThread(path,drname,topicname);

tt.start();

hm.put(path, tt);

}

}

} //System.out.println(fs[i].getAbsolutePath());

}

Thread.sleep(100);

}

}

}

转:http://blog.csdn.net/u011750989/article/details/21957741
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: