日志系统搭建一(flume+hadoop+hive)
2014-05-18 18:23
357 查看
由于公司现在业务日志量逐渐增加,一台服务器io满足不了需求,打在多台服务器上面,准备搭建一个简单日志系统。
许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:
(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;
(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;
(3) 具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。
本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开源的日志系统,包括facebook的scribe,apache的chukwa,linkedin的kafka和cloudera的flume等。
下面表格对比了这四个系统:
通过对比分析,选用了flume,主要安装简单,易配置,功能强大。
flume的安装就不说了。
主要遇到一个配置问题,日志要求按照天归类,很多网友都会有这个问题,我解决方法如下:
1.下载flume 1.3.1版本
2.配置文件如下
#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#a1.sources.testtail.interceptors = i1
#a1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data_disk/logs/flume/
a1.sources.r1.fileHeader = false
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#a1.sources.r1.command = tail -F /data_disk/logs/2014-05-11.txt
#a1.sources.r1.port = 44444
#Describe the sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://10.160.0.237:9000/flume/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = update.u.gsie.cn_
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 1
#a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.writeFormat = Text
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.hadoop安装以及配置
参照网上配置即可
4.hadoop程序编写
cd /data_disk/hadoop_code/
mkdir -p FirstJar/class
cd FirstJar
vim WordCount,java
package test;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount{
public static class Map extends Mapper<LongWritable, Text, Text,
IntWritable> {
private final static IntWritable
one = new IntWritable(1);
private Text word
= new Text();
public void map(LongWritable
key, Text value, Context context) throws IOException, InterruptedException {
String line
= value.toString();
StringTokenizer tokenizer
= new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text,
IntWritable, Text,
IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf
= new Configuration();
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
// +++++
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
hadoop 输入文件:
vim file01
hello world by world
vim file02
hello hadoop by hadoop
vim input.txt
world 1 2 3 ii pp world hadoop 1 hadoop
hadoop dfs -put input.txt input
hadoop dfs -put file0* input
javac -classpath /data_disk/software/hadoop/hadoop-core-1.2.1.jar -d class WordCount.java
jar -cvf wordcount.jar -C class .
hadoop jar wordcount.jar test.WordCount input output
hadoop dfs -ls output
Found 3 items
-rw-r--r-- 3 root supergroup 0 2014-05-18 17:54 /user/root/output/_SUCCESS
drwxr-xr-x - root supergroup 0 2014-05-18 17:53 /user/root/output/_logs
-rw-r--r-- 3 root supergroup 39 2014-05-18 17:54 /user/root/output/part-r-00000
hadoop dfs -ls output/part-r-00000
5.hive
利用hive把log数据导入表中,统计计算
许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:
(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;
(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;
(3) 具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。
本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开源的日志系统,包括facebook的scribe,apache的chukwa,linkedin的kafka和cloudera的flume等。
下面表格对比了这四个系统:
通过对比分析,选用了flume,主要安装简单,易配置,功能强大。
flume的安装就不说了。
主要遇到一个配置问题,日志要求按照天归类,很多网友都会有这个问题,我解决方法如下:
1.下载flume 1.3.1版本
2.配置文件如下
#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#a1.sources.testtail.interceptors = i1
#a1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data_disk/logs/flume/
a1.sources.r1.fileHeader = false
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#a1.sources.r1.command = tail -F /data_disk/logs/2014-05-11.txt
#a1.sources.r1.port = 44444
#Describe the sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://10.160.0.237:9000/flume/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = update.u.gsie.cn_
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 1
#a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.writeFormat = Text
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.hadoop安装以及配置
参照网上配置即可
4.hadoop程序编写
cd /data_disk/hadoop_code/
mkdir -p FirstJar/class
cd FirstJar
vim WordCount,java
package test;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount{
public static class Map extends Mapper<LongWritable, Text, Text,
IntWritable> {
private final static IntWritable
one = new IntWritable(1);
private Text word
= new Text();
public void map(LongWritable
key, Text value, Context context) throws IOException, InterruptedException {
String line
= value.toString();
StringTokenizer tokenizer
= new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text,
IntWritable, Text,
IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf
= new Configuration();
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
// +++++
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
hadoop 输入文件:
vim file01
hello world by world
vim file02
hello hadoop by hadoop
vim input.txt
world 1 2 3 ii pp world hadoop 1 hadoop
hadoop dfs -put input.txt input
hadoop dfs -put file0* input
javac -classpath /data_disk/software/hadoop/hadoop-core-1.2.1.jar -d class WordCount.java
jar -cvf wordcount.jar -C class .
hadoop jar wordcount.jar test.WordCount input output
hadoop dfs -ls output
Found 3 items
-rw-r--r-- 3 root supergroup 0 2014-05-18 17:54 /user/root/output/_SUCCESS
drwxr-xr-x - root supergroup 0 2014-05-18 17:53 /user/root/output/_logs
-rw-r--r-- 3 root supergroup 39 2014-05-18 17:54 /user/root/output/part-r-00000
hadoop dfs -ls output/part-r-00000
5.hive
利用hive把log数据导入表中,统计计算
相关文章推荐
- flume日志收集系统搭建
- 基于flume+kafka+storm日志收集系统搭建
- Flume+Hadoop+Hive的离线分析系统基本架构
- Flume+Elasticsearch搭建实时日志分析系统
- Flume + Solr + log4j搭建web日志采集系统
- Flume-ng 1.6.0+ Elasticsearch 1.7.2+ Kibana4 日志收集分析系统环境搭建及介绍
- Flume1.7.0+Elasticsearch1.7.5+Kibana4.11日志收集分析系统环境搭建
- 教你一步搭建Flume分布式日志系统
- Flume + HDFS + Hive日志收集系统
- 通过hadoop + hive搭建离线式的分析系统之快速搭建一览
- Hadoop硬实战之一:使用flume将系统日志文件导入HDFS
- Flume+Hadoop+Hive的离线分析系统基本架构
- Spark - Hadoop-Spark-Hive-Kafka-Flume 分布式集群搭建 与 spark rdd常用api操作
- 日志系统系列flume-ng+kafka+storm+HDFS搭建实验
- Flume+Elasticsearch搭建实时日志分析系统
- Flume+Hadoop+Hive的离线分析系统基本架构
- Flume+Hadoop+Hive的离线分析系统基本架构
- Flume + Solr + log4j搭建web日志采集系统
- Flume + Solr + log4j搭建web日志采集系统