您的位置:首页 > 其它

Spark Streaming结合Flume、Kafka最新最全日志分析

2016-06-23 00:00 846 查看
摘要: 这是一份良好的可以执行的日志分析,Spark Streaming、Flume、Kafka 最新最全日志分析,请按照要求建立项目工程.关于jar包,建议自己编译.如果还有问题,实在 搞不定的,请联系我给你们编译好的相关jar包.一定要自己先动手尝试.

Spark Streaming结合Flume、Kafka最新最全日志分析

1.修改相应的配置文件

按照 http://my.oschina.net/sunmin/blog/692994

整合安装Flume+Kafka+SparkStreaming

将flume/conf/producer.conf将需要监控的日志输出文件修改为本地的log 路径:
/var/log/nginx/www.eric.aysaas.com-access.log


2.导入相关 jar 包

(快捷键 Ctrl + Alt + Shift + s),点击Project Structure界面左侧的“Modules”显示下图界面



jar 包自己编译,或者去载 http://search.maven.org/#search|ga|1|g%3A%22org.apache.spark%22%20AND%20v%3A%221.6.1%22

3.新建log/KafkaTest.scala 代码如下

[code=language-scala]import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
* flume+kafka+SparkStreaming 实时 nginx 日志获取
* Created by eric on 16/6/29.
*/
object KafkaLog {

def main(agrs: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[1]").setAppName("StreamingTest")
val ssc = new StreamingContext(sparkConf, Seconds(20))//代表一个给定的秒数的实例

val topic = "HappyBirthDayToAnYuan"
val topicSet = topic.split(" ").toSet

//用 brokers and topics 创建 direct kafka stream
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

//直接从 kafka brokers 拉取信息,而不使用任何接收器.
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
)
val lines = messages.map(_._2)
lines.print()
val words: DStream[String] = lines.flatMap(_.split("\n"))
words.count().print()

//启动
ssc.start()
ssc.awaitTermination()
}
}


4.Test

访问本地页面产生日志 http://www.eric.aysaas.com/app/admin
在这20秒内总共产生的日志行数为:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息