java实时监听日志写入kafka(多目录)
2017-05-25 17:51
381 查看
目的
实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)源码
[java] view plain copyimport java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
;
public class XTail_Line {
public static class TailFileThread extends Thread
{
File file;
LineNumberReader randomFile=null;
String newfile=null;
String thisfile=null;
String prefile=null;
private long lastTimeFileSize = 0;
private String drname=null;
int ln=0;
int beginln=0;
private Producer<String,String> inner;
java.util.Random ran = new Random();
String topicname=null;
public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException
{
file=new File(path);
this.drname=drname;
this.topicname=topicname;
Properties properties = new Properties();
// properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
properties.load(new FileInputStream("producer.properties"));
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer<String, String>(config);
}
public void send(String topicName,String message) {
if(topicName == null || message == null){
return;
}
// KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
//随机作为key,hash分散到各个分区
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);
// KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
inner.send(km);
}
public void send(String topicName,Collection<String> messages) {
if(topicName == null || messages == null){
return;
}
if(messages.isEmpty()){
return;
}
List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
for(String entry : messages){
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
}
inner.send(kms);
}
public void close(){
inner.close();
}
public String getNewFile(File file)
{
File[] fs=file.listFiles();
long maxtime=0;
String newfilename="";
for (int i=0;i<fs.length;i++)
{
if (fs[i].isFile()&&fs[i].lastModified()>maxtime)
{
maxtime=fs[i].lastModified();
newfilename=fs[i].getAbsolutePath();
}
}
return newfilename;
}
//写入文件名及行号
public void writePosition(String path,int rn)
{
try {
BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position"));
out.write(path+","+rn);
out.close();
} catch (IOException e) {
}
}
public void run()
{
thisfile=getNewFile(file);
prefile=thisfile;
//访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件
try {
BufferedReader br=new BufferedReader(new FileReader(drname+".position"));
String line=br.readLine();
if (line!=null &&line.contains(","))
{
thisfile=line.split(",")[0];
prefile=thisfile;
beginln=Integer.parseInt(line.split(",")[1]);
}
} catch (FileNotFoundException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
//指定文件可读可写
try {
randomFile = new LineNumberReader(new FileReader(thisfile));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while (true)
{
try {
Thread.sleep(100);
//调用interrupt方法后
if(isInterrupted())
{
System.out.println("Interrupted...");
break;
}
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
//获得变化部分的
// randomFile.seek(lastTimeFileSize);
String tmp = "";
while( (tmp = randomFile.readLine())!= null) {
int currln=randomFile.getLineNumber();
//beginln默认为0
if (currln>beginln)
send(topicname,new String(tmp.getBytes("utf8")));
ln++;
//每发生一条写一次影响效率
if (ln>100)
{
writePosition(thisfile,currln);
ln=0;
}
}
thisfile=getNewFile(file);
if(!thisfile.equals(prefile))
{
randomFile.close();
randomFile = new LineNumberReader(new FileReader(thisfile));
prefile=thisfile;
beginln=0;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
public static void main(String[] args) throws Exception {
/*
LogView view = new LogView();
final File tmpLogFile = new File("D:\\test.txt");
view.realtimeShowLog(tmpLogFile);
*/
if (args.length!=2)
{
System.out.println("usage:topicname pathname");
System.exit(1);
}
String topicname=args[0];
String pathname=args[1];
HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>();
File tmpLogFile = new File(pathname);
File[] fs=tmpLogFile.listFiles();
while (true)
{
fs=tmpLogFile.listFiles();
for (int i=0;i<fs.length;i++)
{
if(fs[i].isDirectory())
{
String path=fs[i].getAbsolutePath();
//以drname作为position文件名
String drname=fs[i].getName();
//如果该目录对应的处理线程已经存在,判断是否存活
if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa")
)
{
if (hm.containsKey(path))
{
if (!hm.get(path).isAlive())
{
hm.get(path).interrupt();
TailFileThread tt=new TailFileThread(path,drname,topicname);
tt.start();
hm.put(path, tt);
}
}
//如果不存在,新建
else
{
TailFileThread tt=new TailFileThread(path,drname,topicname);
tt.start();
hm.put(path, tt);
}
}
} //System.out.println(fs[i].getAbsolutePath());
}
Thread.sleep(100);
}
}
}
转:http://blog.csdn.net/u011750989/article/details/21957741
相关文章推荐
- java实时监听日志写入kafka(多目录)
- java实时监听日志写入kafka
- java实时监听日志写入kafka(转)
- java实时监听日志写入kafka(转)
- java实时监听日志写入kafka
- Java实现系统目录实时监听更新。
- 使用Log4j将程序日志实时写入Kafka
- 利用Maxwell组件实时监听Mysql的binlog日志,并且把解析的json格式数据发送到kafka窗口供实时消费
- 实时监听文件写入kafka
- 使用Log4j将程序日志实时写入Kafka
- Flume读取日志数据并写入到Kafka,ConsoleConsumer进行实时消费
- 使用Log4j将程序日志实时写入Kafka
- 使用Log4j将程序日志实时写入Kafka
- Java实现系统目录实时监听更新。
- 将Log4j日志实时写入Kafka
- 使用Log4j将程序日志实时写入Kafka(转)
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- 将java的console日志写入文件
- Java 自定义日志写入