您的位置:首页 > 大数据 > Hadoop

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

2016-11-10 13:41 831 查看
搭建前提:
Hadoop2.6、spark1.6-hadoop-2.6集群都是正确搭建并可运行

一 、需求描述

  日志文件预处理:运营商数据  kafka做队列缓冲  flume分发  streaming计算  HDFS存储

二、 系统搭建
No.1  flume-ng 1.6集群 

1.下载安装并配置好flume的运行环境

2.编写配置文件
# ------------------- 定义数据流----------------------
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相关配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
agent.sources.kafkaSource.topic = T20161031
#agent.sources.kafkaSource.groupId = flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000

#------- memoryChannel相关配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream

3.启动Flume
bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console

No.2 kafka 2.10-0.10

1.下载安装并配置kafka集群

2.创建topic 
bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181 --replication-factor 5 --partition 5 --topic T20161031

3.开启生产者线程并向T20161031写入数据
bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161031

4.开启消费者线程查看消息是否写入当前topic
bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181 --topic T20161031 --from-beginning

5.关闭当前消费者线程

No.3 sparkStreaming 1.6.2

1. 下载jar包 (这里注意当前Hadoop集群的版本,我的是2.6.X)
scala-library.jar
spark-assembly-1.6.2-hadoop2.6.0.jar

2.编写程序
package com.unisk.spark.sparkStreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;

public class StreamingOnHDFS {

public static void main(String[] args){
final SparkConf conf = new
SparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;
Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒
final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";

JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory, conf) ;
}
} ;
JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;
JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays. asList(line.split(" ")) ;
}
}) ;
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,
Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1) ;
}
}) ;
JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,
Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
 return v1 + v2;
}
}) ;
wordscount.print();

jsc.start() ;
jsc.awaitTermination() ;
jsc.close() ;
}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){
System. out.println("==========Creating new context==============") ;
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;
ssc.checkpoint(checkpointDirectory) ;
return ssc;
}
}

3. 打包(打成runnable jar)并上传

4. 开启spark1.6集群

5. 进入spark安装目录
bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar

6. 简单的wordcount程序,print方法将结果打印出来,若需要将结果存储至其他位置如redis等,可使用foreachRDD方法  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: