Spark Streaming结合Flume、Kafka最新最全日志分析
2016-06-23 00:00
846 查看
摘要: 这是一份良好的可以执行的日志分析,Spark Streaming、Flume、Kafka 最新最全日志分析,请按照要求建立项目工程.关于jar包,建议自己编译.如果还有问题,实在 搞不定的,请联系我给你们编译好的相关jar包.一定要自己先动手尝试.
整合安装Flume+Kafka+SparkStreaming
jar 包自己编译,或者去载 http://search.maven.org/#search|ga|1|g%3A%22org.apache.spark%22%20AND%20v%3A%221.6.1%22
在这20秒内总共产生的日志行数为:
Spark Streaming结合Flume、Kafka最新最全日志分析
1.修改相应的配置文件
按照 http://my.oschina.net/sunmin/blog/692994整合安装Flume+Kafka+SparkStreaming
将flume/conf/producer.conf将需要监控的日志输出文件修改为本地的log 路径: /var/log/nginx/www.eric.aysaas.com-access.log
2.导入相关 jar 包
(快捷键 Ctrl + Alt + Shift + s),点击Project Structure界面左侧的“Modules”显示下图界面jar 包自己编译,或者去载 http://search.maven.org/#search|ga|1|g%3A%22org.apache.spark%22%20AND%20v%3A%221.6.1%22
3.新建log/KafkaTest.scala 代码如下
[code=language-scala]import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * flume+kafka+SparkStreaming 实时 nginx 日志获取 * Created by eric on 16/6/29. */ object KafkaLog { def main(agrs: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[1]").setAppName("StreamingTest") val ssc = new StreamingContext(sparkConf, Seconds(20))//代表一个给定的秒数的实例 val topic = "HappyBirthDayToAnYuan" val topicSet = topic.split(" ").toSet //用 brokers and topics 创建 direct kafka stream val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092") //直接从 kafka brokers 拉取信息,而不使用任何接收器. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicSet ) val lines = messages.map(_._2) lines.print() val words: DStream[String] = lines.flatMap(_.split("\n")) words.count().print() //启动 ssc.start() ssc.awaitTermination() } }
4.Test
访问本地页面产生日志 http://www.eric.aysaas.com/app/admin在这20秒内总共产生的日志行数为:
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Shell日志分析常用命令和例子
- Flume环境部署和配置详解及案例大全
- shell脚本实现的网站日志分析统计(可以统计9种数据)
- Node.js和MongoDB实现简单日志分析系统
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Play! Akka Flume实现的完整数据收集
- Spark初探