flume监控spoolDir日志到HDFS整个流程小Demo
2016-10-13 16:15
337 查看
今天做了一个flume监控spoolDir日志到HDFS整个流程的小Demo。
流程:
1.编写java代码,随机生成用户ID号码,区县号码,乡镇号码(区县和乡镇号码用随机的三位字母表示)和个人总收入格式样例:779362a1-bf04-468a-91b6-a19d772f41fa####AFC####sfe####8091748。
2.用一个线程循环执行,用Thread.sleep(100)来控制线程执行一次停止100ms,防止cpu在死循环中过载,一秒生成10条数据,用log4j生成相应的日志到指定的目录下面,其中日志每分钟就生成一个格式为yyyy-MM-dd-HH-mm 例如:service.log.2016-10-13-11-32,最后在linux下用shell脚本启动这个java程序。
3.编写shell脚本,定时每分钟从log4j生成的脚本copy当前时间前一分钟产生的日志文件到被flume监控的文件夹内,注意copy过去应该在文件名后面加上.COMPLETED,copy完成后又把这个文件名的.COMPLETED去掉。
例如:
主要是防止源日志文件太大copy的时候会花比较长的时间,到时候flume会抛异常,当然你还可以使用另外一种解决方案:直接move源日志文件到被flume监控的目录中,不过这种方案没有上面的方案优。
4.配置flume的conf文件
5.编辑crontab每分钟执行这个脚本来拉取源日志文件。
环境:
1.使用的虚拟机为:vmware12
2.centOS6.5
3.hadoop2.2.0 单节点(主要测试用,所以直接用的单节点)
4.Flume 1.6.0 (刚开始用的flume-ng-1.5.0-cdh5.4.5,结果配置中的一个方法在这个版本的flume包里找不到抛异常,就换了个版本搞定)
java代码如下:
其中需要配置log4j配置文件,以及添加log4j的依赖jar包
log4j的配置文件:
启动java程序的shell脚本 start.sh
定时拉取源日志的shell脚本 mvlog.sh
flume的配置文件:
启动flume的命令:
crontab的配置
准备工作进行好之后,执行java程序
sh ./start.sh
产生如下日志文件:
日志的内容:
定时任务会拉取这个目录下的日志到monitor目录下,flume就会收集,手机完成后会在文件名添加.COMPLETED后缀:
hdfs的flume下面就会生成当天时间格式化的目录,并且收集的数据会被put到该目录下:
java代码一直生成日志文件,crontab每隔一分钟都会拉取日志到flume监控的目录下面,flume就会把该文件收集到hdfs,这样一个简单的flume监控spoolDir日志到HDFS整个流程的小Demo就实现了。
这里给出老版本和新版本的配置demo,因为官网上面已经明确指明了,新版本已经将一些配置舍弃了:
备注下kafkasource配置(老版本):
测试了一下,启动多个flume配置都是同一个group消费同一个kafka数据,也遵循kafka的算法 topic的partition只有三个,我启动了4个flume发现只有三个flume在采集数据,而第四个flume程序在空跑。
测试结果:
新版本:
流程:
1.编写java代码,随机生成用户ID号码,区县号码,乡镇号码(区县和乡镇号码用随机的三位字母表示)和个人总收入格式样例:779362a1-bf04-468a-91b6-a19d772f41fa####AFC####sfe####8091748。
2.用一个线程循环执行,用Thread.sleep(100)来控制线程执行一次停止100ms,防止cpu在死循环中过载,一秒生成10条数据,用log4j生成相应的日志到指定的目录下面,其中日志每分钟就生成一个格式为yyyy-MM-dd-HH-mm 例如:service.log.2016-10-13-11-32,最后在linux下用shell脚本启动这个java程序。
3.编写shell脚本,定时每分钟从log4j生成的脚本copy当前时间前一分钟产生的日志文件到被flume监控的文件夹内,注意copy过去应该在文件名后面加上.COMPLETED,copy完成后又把这个文件名的.COMPLETED去掉。
例如:
#首先 cp ./log4j/service.log.2016-10-13-11-37 ./monitor/service.log.2016-10-13-11-37.COMPLETED #然后 mv ./monitor/service.log.2016-10-13-11-37.COMPLETED ./monitor/service.log.2016-10-13-11-37
主要是防止源日志文件太大copy的时候会花比较长的时间,到时候flume会抛异常,当然你还可以使用另外一种解决方案:直接move源日志文件到被flume监控的目录中,不过这种方案没有上面的方案优。
4.配置flume的conf文件
5.编辑crontab每分钟执行这个脚本来拉取源日志文件。
环境:
1.使用的虚拟机为:vmware12
2.centOS6.5
3.hadoop2.2.0 单节点(主要测试用,所以直接用的单节点)
4.Flume 1.6.0 (刚开始用的flume-ng-1.5.0-cdh5.4.5,结果配置中的一个方法在这个版本的flume包里找不到抛异常,就换了个版本搞定)
java代码如下:
其中需要配置log4j配置文件,以及添加log4j的依赖jar包
package com.lijie.test; import java.util.UUID; import org.apache.log4j.Logger; public class DataProduct { public static void main(String[] args) { Thread t1 = new Thread(new A()); t1.start(); } } class A extends Thread { private final Logger log = Logger.getLogger(A.class); public void run() { //无限循环 while (true) { //随机产生一个用户uuid UUID userId = UUID.randomUUID(); //产生一个随机的用户总资产 int num = (int) (Math.random() * 10000000) + 100000; //产生一个随意的县名 StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3; i++) { char a = (char) (Math.random() * (90 - 65) + 65); sb.append(a); } String xian = sb.toString(); //产生一个随机的镇名 StringBuilder sb1 = new StringBuilder(); for (int i = 0; i < 3; i++) { char a = (char) (Math.random() * (122 - 97) + 97); sb1.append(a); } String zhen = sb1.toString(); //生成日志 log.info(userId + "####" + xian + "####" + zhen + "####" + num); //停0.1秒钟 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
log4j的配置文件:
log4j.rootCategory=INFO, stdout , R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%t] %C.%M(%L) | %m%n log4j.appender.R=org.apache.log4j.DailyRollingFileAppender log4j.appender.R.File=/home/hadoop/log4j/service.log log4j.appender.R.DatePattern = '.'yyyy-MM-dd-HH-mm log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d %p [%t] %C.%M(%L) | %m%n log4j.logger.com.xxx=DEBUG log4j.logger.controllers=DEBUG log4j.logger.vo=DEBUG log4j.logger.notifiers=DEBUG log4j.logger.com.opensymphony.oscache=WARN log4j.logger.net.sf.navigator=WARN log4j.logger.org.apache.commons=WARN log4j.logger.org.apache.struts=WARN log4j.logger.org.displaytag=WARN log4j.logger.org.springframework=WARN log4j.logger.org.apache.velocity=FATAL
启动java程序的shell脚本 start.sh
APP_HOME=/home/hadoop/myjar APP_CLASSPATH=$APP_HOME/bin jarList=$(ls $APP_CLASSPATH|grep jar) echo $jarList for i in $jarList do APP_CLASSPATH="$APP_CLASSPATH/$i": done echo $APP_CLASSPATH export CLASSPATH=$CLASSPATH:$APP_CLASSPATH echo $CLASSPATH java -Xms50m -Xmx250m com.lijie.test.DataProduct echo Linux Test End
定时拉取源日志的shell脚本 mvlog.sh
#! /bin/bash DIR=$(cd `dirname $0`; pwd) mydate=`date +%Y-%m-%d-%H-%M -d '-1 minutes'` logName="service.log" monitorDir="/home/hadoop/monitor/" filePath="${DIR}"/log4j/"" fileName="${logName}"".""${mydate}" echo "文件地址:${filePath}" echo "文件名字:${fileName}" if [ -f "${monitorDir}""${fileName}" ] then echo "文件存在,删除文件" rm -rf "${monitorDir}""${fileName}" fi echo "开始复制文件" cp "${filePath}${fileName}" "${monitorDir}${fileName}"".COMPLETED" echo "日志复制完成,更改名字" mv "${monitorDir}${fileName}"".COMPLETED" "${monitorDir}${fileName}" echo "日志改名完成" exit
flume的配置文件:
#agent名, source、channel、sink的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #具体定义source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/monitor #具体定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #具体定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.80.123:9000/flume/%Y%m%d a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #不按照条数生成文件 a1.sinks.k1.hdfs.rollCount = 0 #HDFS上的文件达到128M时生成一个文件 a1.sinks.k1.hdfs.rollSize = 134217728 #HDFS上的文件达到60秒生成一个文件 a1.sinks.k1.hdfs.rollInterval = 60 #组装source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动flume的命令:
../bin/flume-ng agent -n a1 -c conf -f ./flume-conf.properties -Dflume.root.logger=DEBUG,console
crontab的配置
#首先crontab -e编辑下面的代码然后保存 * * * * * sh /home/hadoop/mvlog.sh #然后启动crontab服务 service crond start
准备工作进行好之后,执行java程序
sh ./start.sh
产生如下日志文件:
日志的内容:
定时任务会拉取这个目录下的日志到monitor目录下,flume就会收集,手机完成后会在文件名添加.COMPLETED后缀:
hdfs的flume下面就会生成当天时间格式化的目录,并且收集的数据会被put到该目录下:
java代码一直生成日志文件,crontab每隔一分钟都会拉取日志到flume监控的目录下面,flume就会把该文件收集到hdfs,这样一个简单的flume监控spoolDir日志到HDFS整个流程的小Demo就实现了。
这里给出老版本和新版本的配置demo,因为官网上面已经明确指明了,新版本已经将一些配置舍弃了:
备注下kafkasource配置(老版本):
#agent名, source、channel、sink的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #具体定义source a1.sources.r1.type =org.apache.flume.source.kafka.KafkaSource a1.sources.r1.zookeeperConnect = lijie:2181 a1.sources.r1.groupId = flume a1.sources.r1.topic = topic001 a1.sources.r1.kafka.consumer.timeout.ms = 100 #具体定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #具体定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.80.123:9000/flumetest/%Y%m%d a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #不按照条数生成文件 a1.sinks.k1.hdfs.rollCount = 0 #HDFS上的文件达到128M时生成一个文件 a1.sinks.k1.hdfs.rollSize = 134217728 #HDFS上的文件达到60秒生成一个文件 a1.sinks.k1.hdfs.rollInterval = 60 #组装source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
测试了一下,启动多个flume配置都是同一个group消费同一个kafka数据,也遵循kafka的算法 topic的partition只有三个,我启动了4个flume发现只有三个flume在采集数据,而第四个flume程序在空跑。
测试结果:
新版本:
#agent名, source、channel、sink的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #具体定义source a1.sources.r1.type =org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers=testmaster:9092,testslave01:9092,testslave02:9092 a1.sources.r1.kafka.topics = kafkatest a1.sources.r1.kafka.consumer.group.id = lijiegrp a1.sources.r1.kafka.consumer.timeout.ms = 100 #具体定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #具体定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://testmaster:8020/flumetest/%Y%m%d a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #不按照条数生成文件 a1.sinks.k1.hdfs.rollCount = 0 #HDFS上的文件达到128M时生成一个文件 a1.sinks.k1.hdfs.rollSize = 134217728 #HDFS上的文件达到60秒生成一个文件 a1.sinks.k1.hdfs.rollInterval = 60 #组装source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
相关文章推荐
- flume监控spoolDir日志到HDFS整个流程小Demo
- flume监控spoolDir日志到HDFS(从日志产生到hdfs上一整套)
- [ETL] Flume 理论与demo(Taildir Source & Hdfs Sink)
- flume:spooldir采集日志,kafka输出的配置问题
- Flume(4)实用环境搭建:source(spooldir)+channel(file)+sink(hdfs)方式
- Flume-ng spoolDir目录监控踩过的坑
- Flume-ng 1.7.0 安装、配置及说明之1-直接读取Nginx日志存入HDFS
- Flume-NG + HDFS + HIVE 日志收集分析 | EyeLu技术Blog
- flume学习(五):flume将log4j日志数据写入到hdfs
- 使用Flume向HDFS持久化数据(日志)
- Linux的系统调用、网络连接状态、磁盘I/O;可疑行为监控/日志收集、SHELL命令执行流程
- Flume向hdfs发送日志文件配置
- 【日志处理、监控ELK、Kafka、Flume等相关资料】
- flume实例二、监听目录日志上传到HDFS文件系统
- Flume 采集rsyslog整个配置和流程
- Flume 收集Nginx日志到Hdfs Tail-to-hdfs sink
- flume1.7.0-taildirSource 支持多文件监控和断点续传
- JBPM4基础篇05-设计请假流程,流程的发起,执行,驳回,监控web Demo
- Flume的hdfsSink的roll参数不生效的原因(日志上传hdfs)
- JBPM4基础篇05-设计请假流程,流程的发起,执行,驳回,监控web Demo