您的位置:首页 > 运维架构

日志系统搭建一(flume+hadoop+hive)

2014-05-18 18:23 357 查看
由于公司现在业务日志量逐渐增加,一台服务器io满足不了需求,打在多台服务器上面,准备搭建一个简单日志系统。

许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:

(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;

(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;

(3) 具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。

本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开源的日志系统,包括facebook的scribe,apache的chukwa,linkedin的kafka和cloudera的flume等。

下面表格对比了这四个系统:



通过对比分析,选用了flume,主要安装简单,易配置,功能强大。

flume的安装就不说了。

主要遇到一个配置问题,日志要求按照天归类,很多网友都会有这个问题,我解决方法如下:

1.下载flume 1.3.1版本

2.配置文件如下

#Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#a1.sources.testtail.interceptors = i1

#a1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data_disk/logs/flume/

a1.sources.r1.fileHeader = false

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

#a1.sources.r1.command = tail -F  /data_disk/logs/2014-05-11.txt

#a1.sources.r1.port = 44444

#Describe the sink

#a1.sinks.k1.type = logger

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://10.160.0.237:9000/flume/%Y-%m-%d/

a1.sinks.k1.hdfs.filePrefix = update.u.gsie.cn_

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 1

#a1.sinks.k1.hdfs.roundValue = 1

a1.sinks.k1.hdfs.writeFormat = Text

#a1.sinks.k1.hdfs.useLocalTimeStamp = true

#Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3.hadoop安装以及配置

参照网上配置即可

4.hadoop程序编写

cd /data_disk/hadoop_code/

mkdir -p FirstJar/class

cd FirstJar

  vim WordCount,java

package test;

       

import java.io.IOException;

import java.util.*;

       

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

       

public class WordCount{

    public static class Map extends Mapper<LongWritable, TextText,
IntWritable> {

        private final static IntWritable
one = new IntWritable(1);

        private Text word
= new Text();

       

        public void map(LongWritable
key, Text value, Context context) throws IOExceptionInterruptedException {

            String line
= value.toString();

            StringTokenizer tokenizer
= new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {

                word.set(tokenizer.nextToken());

                context.write(word, one);

            }

        }

    }

       

    public static class Reduce extends Reducer<Text,
IntWritable, Text,
IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOExceptionInterruptedException {

            int sum = 0;

            for (IntWritable val : values) {

                sum += val.get();

            }

            context.write(key, new IntWritable(sum));

        }

    }

       

    public static void main(String[] args) throws Exception {

        Configuration conf
= new Configuration();

       

        Job job = new Job(conf, "wordcount");

   

        job.setJarByClass(WordCount.class);  
 // +++++

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

       

        job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);

       

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);

       

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

       

        job.waitForCompletion(true);

    }    

}
hadoop 输入文件:

vim file01

hello world by world

vim file02

hello hadoop by hadoop

vim input.txt

world 1 2 3 ii pp world hadoop 1 hadoop

hadoop dfs -put input.txt input

hadoop dfs -put file0* input

javac -classpath /data_disk/software/hadoop/hadoop-core-1.2.1.jar -d class WordCount.java

jar -cvf wordcount.jar -C class .

hadoop jar wordcount.jar test.WordCount input output

 hadoop dfs -ls output

Found 3 items

-rw-r--r--   3 root supergroup          0 2014-05-18 17:54 /user/root/output/_SUCCESS

drwxr-xr-x   - root supergroup          0 2014-05-18 17:53 /user/root/output/_logs

-rw-r--r--   3 root supergroup         39 2014-05-18 17:54 /user/root/output/part-r-00000

 hadoop dfs -ls output/part-r-00000

5.hive

利用hive把log数据导入表中,统计计算
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  日志 flume hadoop hive