spark streaming接kafka数据方式汇总
2016-03-22 17:02
363 查看
1、利用kafka高阶api
(1)常用的方法
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
这是example中的例子,这种接数据的方式有以下几个特征
1)如果group是第一次注册,那么会从当前时间在zookeeper上注册消费信息,并开始消费。
如果我再Time2的位置才开始启动程序,那么它是消费不到Time1的数据的
假设Time1处已经停止发送数据了,那么Time2处启动的sparkStreaming程序是消费不到消息的
2)控制输入速率
在sparkconf中设置参数
如果想对一个topic的历史消息进行处理的时候,需要改动一下创建kafka流的方式,如下:
在kafkaParams中添加参数
然后再main函数中调用createStreaming方法即可
(1)常用的方法
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
这是example中的例子,这种接数据的方式有以下几个特征
1)如果group是第一次注册,那么会从当前时间在zookeeper上注册消费信息,并开始消费。
如果我再Time2的位置才开始启动程序,那么它是消费不到Time1的数据的
假设Time1处已经停止发送数据了,那么Time2处启动的sparkStreaming程序是消费不到消息的
2)控制输入速率
在sparkconf中设置参数
sparkConf.set("spark.streaming.receiver.maxRate","100")
如果想对一个topic的历史消息进行处理的时候,需要改动一下创建kafka流的方式,如下:
def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset"->"smallest") KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) }
在kafkaParams中添加参数
"auto.offset.reset"->"smallest"
然后再main函数中调用createStreaming方法即可
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- Spark HA原理架构图
- spark内存概述
- Spark Shuffle之Hash Shuffle
- Spark Shuffle之Sort Shuffle
- Spark Shuffle之Tungsten Sort Shuffle
- 编译Spark 1.5.2