flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统
2016-11-10 13:41
831 查看
搭建前提:
Hadoop2.6、spark1.6-hadoop-2.6集群都是正确搭建并可运行
一 、需求描述
日志文件预处理:运营商数据 kafka做队列缓冲 flume分发 streaming计算 HDFS存储
二、 系统搭建
No.1 flume-ng 1.6集群
1.下载安装并配置好flume的运行环境
2.编写配置文件
# ------------------- 定义数据流----------------------
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel
#-------- kafkaSource相关配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
agent.sources.kafkaSource.topic = T20161031
#agent.sources.kafkaSource.groupId = flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000
#------- memoryChannel相关配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
3.启动Flume
bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console
No.2 kafka 2.10-0.10
1.下载安装并配置kafka集群
2.创建topic
bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181 --replication-factor 5 --partition 5 --topic T20161031
3.开启生产者线程并向T20161031写入数据
bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161031
4.开启消费者线程查看消息是否写入当前topic
bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181 --topic T20161031 --from-beginning
5.关闭当前消费者线程
No.3 sparkStreaming 1.6.2
1. 下载jar包 (这里注意当前Hadoop集群的版本,我的是2.6.X)
scala-library.jar
spark-assembly-1.6.2-hadoop2.6.0.jar
2.编写程序
package com.unisk.spark.sparkStreaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
public class StreamingOnHDFS {
public static void main(String[] args){
final SparkConf conf = new
SparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;
Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒
final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory, conf) ;
}
} ;
JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;
JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays. asList(line.split(" ")) ;
}
}) ;
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,
Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1) ;
}
}) ;
JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,
Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) ;
wordscount.print();
jsc.start() ;
jsc.awaitTermination() ;
jsc.close() ;
}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){
System. out.println("==========Creating new context==============") ;
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;
ssc.checkpoint(checkpointDirectory) ;
return ssc;
}
}
3. 打包(打成runnable jar)并上传
4. 开启spark1.6集群
5. 进入spark安装目录
bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar
6. 简单的wordcount程序,print方法将结果打印出来,若需要将结果存储至其他位置如redis等,可使用foreachRDD方法
Hadoop2.6、spark1.6-hadoop-2.6集群都是正确搭建并可运行
一 、需求描述
日志文件预处理:运营商数据 kafka做队列缓冲 flume分发 streaming计算 HDFS存储
二、 系统搭建
No.1 flume-ng 1.6集群
1.下载安装并配置好flume的运行环境
2.编写配置文件
# ------------------- 定义数据流----------------------
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel
#-------- kafkaSource相关配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
agent.sources.kafkaSource.topic = T20161031
#agent.sources.kafkaSource.groupId = flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000
#------- memoryChannel相关配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
3.启动Flume
bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console
No.2 kafka 2.10-0.10
1.下载安装并配置kafka集群
2.创建topic
bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181 --replication-factor 5 --partition 5 --topic T20161031
3.开启生产者线程并向T20161031写入数据
bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161031
4.开启消费者线程查看消息是否写入当前topic
bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181 --topic T20161031 --from-beginning
5.关闭当前消费者线程
No.3 sparkStreaming 1.6.2
1. 下载jar包 (这里注意当前Hadoop集群的版本,我的是2.6.X)
scala-library.jar
spark-assembly-1.6.2-hadoop2.6.0.jar
2.编写程序
package com.unisk.spark.sparkStreaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
public class StreamingOnHDFS {
public static void main(String[] args){
final SparkConf conf = new
SparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;
Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒
final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory, conf) ;
}
} ;
JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;
JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays. asList(line.split(" ")) ;
}
}) ;
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,
Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1) ;
}
}) ;
JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,
Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) ;
wordscount.print();
jsc.start() ;
jsc.awaitTermination() ;
jsc.close() ;
}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){
System. out.println("==========Creating new context==============") ;
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;
ssc.checkpoint(checkpointDirectory) ;
return ssc;
}
}
3. 打包(打成runnable jar)并上传
4. 开启spark1.6集群
5. 进入spark安装目录
bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar
6. 简单的wordcount程序,print方法将结果打印出来,若需要将结果存储至其他位置如redis等,可使用foreachRDD方法
相关文章推荐
- flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统
- flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统
- 使用Flume+Kafka+SparkStreaming进行实时日志分析
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算 + Spark 基于pyspark下的实时日志分析
- 使用Flume+Kafka+SparkStreaming进行实时日志分析
- [置顶] 使用Flume+Kafka+SparkStreaming进行实时日志分析
- 使用Flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【公安大数据】
- flume-kafka-sparkStreaming日志分析
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统(转)
- 自己标注(不注意坑不少)-Spark+Kafka构建实时分析Dashboard案例——步骤三:Spark Streaming实时处理数据
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- Flume+Kafka+Sparkstreaming日志分析
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- 基于 Kafka 和 ElasticSearch,LinkedIn是如何构建实时日志分析系统的
- Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- 实时日志流系统(kafka-flume-hdfs)
- Spark Streaming结合Flume、Kafka最新最全日志分析