您的位置:首页 > 编程语言 > Java开发

java实时监听日志写入kafka(多目录)

2014-04-06 20:15 453 查看


目的

实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

源码

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.LineNumberReader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
;

public class XTail_Line {

public static class TailFileThread extends Thread
{
File file;
LineNumberReader randomFile=null;
String newfile=null;
String thisfile=null;
String prefile=null;
private long lastTimeFileSize = 0;
private String drname=null;
int ln=0;
int beginln=0;
private Producer<String,String> inner;
java.util.Random ran = new Random();
String topicname=null;

public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException
{
file=new File(path);
this.drname=drname;
this.topicname=topicname;

Properties properties = new Properties();
//   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));

properties.load(new FileInputStream("producer.properties"));

ProducerConfig config = new ProducerConfig(properties);

inner = new Producer<String, String>(config);
}

public void send(String topicName,String message) {
if(topicName == null || message == null){
return;
}
//   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
//随机作为key,hash分散到各个分区
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);
//   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
inner.send(km);

}

public void send(String topicName,Collection<String> messages) {
if(topicName == null || messages == null){
return;
}
if(messages.isEmpty()){
return;
}
List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
for(String entry : messages){
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
}
inner.send(kms);
}

public void close(){
inner.close();
}

public String getNewFile(File file)
{
File[] fs=file.listFiles();
long maxtime=0;
String newfilename="";
for (int i=0;i<fs.length;i++)
{
if (fs[i].isFile()&&fs[i].lastModified()>maxtime)
{
maxtime=fs[i].lastModified();
newfilename=fs[i].getAbsolutePath();

}
}
return newfilename;
}
//写入文件名及行号
public void writePosition(String path,int rn)
{
try {
BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position"));
out.write(path+","+rn);
out.close();
} catch (IOException e) {
}
}

public void run()
{

thisfile=getNewFile(file);
prefile=thisfile;
//访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件
try {
BufferedReader br=new BufferedReader(new FileReader(drname+".position"));
String line=br.readLine();
if (line!=null &&line.contains(","))
{
thisfile=line.split(",")[0];
prefile=thisfile;
beginln=Integer.parseInt(line.split(",")[1]);
}

} catch (FileNotFoundException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}

//指定文件可读可写
try {
randomFile = new LineNumberReader(new FileReader(thisfile));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while (true)
{
try {
Thread.sleep(100);
//调用interrupt方法后
if(isInterrupted())
{
System.out.println("Interrupted...");
break;
}
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
//获得变化部分的
//  randomFile.seek(lastTimeFileSize);
String tmp = "";
while( (tmp = randomFile.readLine())!= null) {
int currln=randomFile.getLineNumber();
//beginln默认为0
if (currln>beginln)
send(topicname,new String(tmp.getBytes("utf8")));

ln++;
//每发生一条写一次影响效率
if (ln>100)
{
writePosition(thisfile,currln);
ln=0;
}

}
thisfile=getNewFile(file);
if(!thisfile.equals(prefile))

{
randomFile.close();
randomFile = new LineNumberReader(new FileReader(thisfile));
prefile=thisfile;
beginln=0;
}

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

public static void main(String[] args) throws Exception {
/*
LogView view = new LogView();

final File tmpLogFile = new File("D:\\test.txt");
view.realtimeShowLog(tmpLogFile);
*/
if (args.length!=2)
{
System.out.println("usage:topicname pathname");
System.exit(1);
}
String topicname=args[0];
String pathname=args[1];

HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>();
File tmpLogFile = new File(pathname);
File[] fs=tmpLogFile.listFiles();
while (true)
{
fs=tmpLogFile.listFiles();
for (int i=0;i<fs.length;i++)
{
if(fs[i].isDirectory())
{
String path=fs[i].getAbsolutePath();
//以drname作为position文件名
String drname=fs[i].getName();
//如果该目录对应的处理线程已经存在,判断是否存活

if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa")
)
{
if (hm.containsKey(path))
{
if (!hm.get(path).isAlive())
{
hm.get(path).interrupt();
TailFileThread tt=new TailFileThread(path,drname,topicname);
tt.start();
hm.put(path, tt);
}

}
//如果不存在,新建
else
{
TailFileThread tt=new TailFileThread(path,drname,topicname);
tt.start();
hm.put(path, tt);

}

}

}			//System.out.println(fs[i].getAbsolutePath());
}
Thread.sleep(100);
}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: