您的位置:首页 > 其它

日志采集分析项目Demo

2017-11-17 16:10 555 查看
一.配置linux定时运行上传日志文件脚本
配置脚本:

       # 编辑命令是crontab -e

# 查看命令是crontab -l

分  时  日  月  周  命令

第1列表示分钟1~59 每分钟用*或者 */1表示
第2列表示小时1~23(0表示0点)
第3列表示日期1~31
第4列表示月份1~12
第5列标识号星期0~6(0表示星期天)
第6列要运行的命令
1分钟运行一次脚本
* * * * * sh /home/willli/apps/uploadFile2Hdfs.v2.sh

*/1 * * * * sh /home/willli/apps/uploadFile2Hdfs.v2.sh

二.配置日志文件采集脚本(uploadFile2Hdfs.v2.sh)
#!/bin/bash
#set java env
export JAVA_HOME=/usr/lib/jvm/jdk1.7.0_79
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
#set hadoop env
export HADOOP_HOME=/home/willli/hadoop-2.6.4
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
#虽然上传到Hadoop集群上了,但是原始文件还在。如何处理?
#日志文件的名称都是xxxx.log1,再次上传文件时,因为hdfs上已经存在了,会报错。如何处理?
#1、先将需要上传的文件移动到待上传目录
#2、在讲文件移动到待上传目录时,将文件按照一定的格式重名名
#/export/software/hadoop.log1   /export/data/click_log/lijing_click_log_{date}
#日志文件存放的目录
log_src_dir=/home/willli/hadoop-2.6.4/logs/log/

#待上传文件存放的目录
log_toupload_dir=/home/willli/hadoop-2.6.4/logs/toupload/
#日志文件上传到hdfs的根路径
day=`date +%Y_%m_%d`

#创建上传需要的hdfs文件目录
hadoop fs -mkdir /data/clickLog/$day/
hdfs_root_dir=/data/clickLog/$day/

#打印环境变量信息
echo "envs: hadoop_home: $HADOOP_HOME"
#读取日志文件的目录,判断是否有需要上传的文件
#echo "log_src_dir:"$log_src_dir
#ls $log_src_dir | while read fileName
do
if [[ "$fileName" == access.log.* ]]; then
# if [ "access.log" = "$fileName" ];then
date=`date +%Y_%m_%d_%H_%M_%S`
#将文件移动到待上传目录并重命名
#打印信息
#echo "moving $log_src_dir$fileName to $log_toupload_dir"lijing_click_log_$fileName"$date"
mv $log_src_dir$fileName $log_toupload_dir"lijing_click_log_$fileName"$date
#将待上传的文件path写入一个列表文件willDoing
echo $log_toupload_dir"lijing_click_log_$fileName"$date >> $log_toupload_dir"willDoing."$date
fi

done
#找到列表文件willDoing
ls $log_toupload_dir | grep will |grep -v "_COPY_" | grep -v "_DONE_" | while read line
do
#打印信息
echo "toupload is in file:"$line
#将待上传文件列表willDoing改名为willDoing_COPY_
mv $log_toupload_dir$line $log_toupload_dir$line"_COPY_"
#读列表文件willDoing_COPY_的内容(一个一个的待上传文件名)  ,此处的line 就是列表中的一个待上传文件的path
cat $log_toupload_dir$line"_COPY_" |while read line
do
#打印信息
echo "puting...$line to hdfs path.....$hdfs_root_dir"
hadoop fs -put $line $hdfs_root_dir
done
mv $log_toupload_dir$line"_COPY_"  $log_toupload_dir$line"_DONE_
4000
"
done

三.编写日志采集保存java程序

   demo:log4j打印日志输出到文件并滚动创建文件
   1)log4j.properties

log4j.rootLogger=INFO,mylog
log4j.appender.testlog = org.apache.log4j.RollingFileAppender
log4j.appender.testlog.layout = org.apache.log4j.PatternLayout
log4j.appender.testlog.layout.ConversionPattern = [%-5p][%-22d{yyyy/MM/dd HH\:mm\:ssS}][%l]%n%m%n
log4j.appender.testlog.Threshold = INFO
log4j.appender.testlog.ImmediateFlush = TRUE
log4j.appender.testlog.Append = TRUE
log4j.appender.testlog.File =  /home/willli/hadoop-2.6.4/logs/log/test.log
log4j.appender.testlog.MaxFileSize = 10KB
log4j.appender.testlog.MaxBackupIndex = 20


   2)TestDemo.java

public class TestDemo {
public static void main(String[] args) throws Exception {
Logger logger = LogManager.getLogger("mylog");
PropertyConfigurator.configure("log4j.properties");
Integer i = 0;
while (true) {
logger.info("mytestlog"+i+System.currentTimeMillis());
i++;
Thread.sleep(50);
if (i > 1000000)
break;
}
}

}


四.编写mapReduce程序分析日志文件

    1)任务划分

     1.MapTask:负责map阶段的整个数据处理流程

     2.ReduceTask:负责reduce阶段的整个数据处理流程

     3.MRAppMaster:负责整个程序的过程调度及状态协调

     2) Demo结构

   1.负责map阶段的整个数据处理流程WordCountMap

/*
* @KEYIN:单行偏移量
* @VALUEIN:单行文本内容
* @KEYOUT:读取的单个单词文本
* @VALUEOUT:用户自定义输出结果
* */
public class WordCountMap extends Mapper< LongWritable,Text, Text,IntWritable > {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] words = line.split(" ");
for (String string : words) {
context.write(new Text(string), new IntWritable(1));
}
}
}


2.负责reduce阶段的整个数据处理流程

   
/*
* KEYIN, VALUEIN,对应map的 KEYOUT, VALUEOUT
*  KEYOUT 自定义输出,
*  VALUEOUT 自定义输出结果
* */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text word, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int count=0;
for (IntWritable value : values) {
count+=value.get();
}
context.write(word, new IntWritable(count));
}

}


3.负责整个程序的过程调度及状态协调
public class WordCountDriver {

public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
//指定业务涉及的mapper/reducer类
job.setMapperClass(WordCount.class);
job.setReducerClass(WordCountReducer.class);
//设置jar包所在本地路径
job.setJarByClass(WordCountDriver.class);
//设置mapper输出结果及类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定输入文件目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定文件输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion?0:1);

}


5.分析或者保存结果

  输出结果存入文件,后期再更改此处
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: