Spark集成Kafka源码分析——SparkStreaming从kafak中接收数据
2015-12-18 17:33
513 查看
整体概括:
要实现SparkStreaming从kafak中接收数据分为以下几步(其中涉及的类在包org.apache.spark.streaming.kafka中):
1.创建createStream()函数,返回类型为ReceiverInputDStream对象,在createStream()函数中最后返回构造的KafkaInputDStream类对象
2.KafkaInputDStream类要继承ReceiverInputDStream,来实现ReceiverInputDStream中的getReceiver()函数,在getReceiver()函数中构造KafkaReceiver类对象
3.KafkaReceiver类是真正干活的类了,前边的一些工作都没啥实质工作,就是在扯皮,就跟工作中某些情况似的,项目中有很多人,一层层的领导们指挥规划任务,但具体干活的就是最底层的几个,不过还是要有这些工作的,这样整体脉络比较清晰。
a.设定kafka相关参数
b.设定存储kafka元数据的zookeeper的地址,连接zookeeper
c.设定kafka中数据的反序列化相关类
d.调用kafka消费者api来获取数据
e.创建线程池来
f.关闭线程池
SparkStreaming从kafak中接收数据的主要工作就是:
1.在Receiver中做:
a.消费消息队列中的数据,得到一条条数据。
b.调用Receiver中store函数将数据存储到Spark内存
2.将createStream、ReceiverInputDStream、KafkaInputDStream、KafkaReceiver、Receiver这些类的关系对应好。
具体逻辑分析:
1.spark官网KafkaWordCount示例:
3.KafkaInputDStream类中代码
a.KafkaInputDStream类继承了ReceiverInputDStream[(K, V)](ssc_)。
b.实现了ReceiverInputDStream中的getReceiver()函数,getReceiver()返回两个Recceiver,原理一样查看KafkaReceiver即可。
4.查看构造的KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel):
在onStart()函数中要做的工作就是把kafka中的数据放到kafka中。
a.设定kafka相关参数
b.设定存储kafka元数据的zookeeper的地址,连接zookeeper
c.设定kafka中数据的反序列化相关类
d.调用kafka消费者api来获取数据
e.创建线程池来将获取的流数据存储到spark,store((msgAndMetadata.key, msgAndMetadata.message))该函数在Receiver类中,就是把该条消息以键值对的形式存储到spark内存中,正因为这种键值存储导致调用 KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)时返回的是键值对的对象,之前用java写spark接收kafak数据调用这个端口时返回这个键值对的对象,我就对此有些疑问,现在明白是在这做的处理导致返回的是键值对对象。
f.关闭线程池
onStop()函数就是关闭消费者与kafka连接了
然后就一层层返回,最后createStream函数的返回对象中就可以得到数据了。
至此spark接收消费kafak数据的工作流程结束了。
要实现SparkStreaming从kafak中接收数据分为以下几步(其中涉及的类在包org.apache.spark.streaming.kafka中):
1.创建createStream()函数,返回类型为ReceiverInputDStream对象,在createStream()函数中最后返回构造的KafkaInputDStream类对象
2.KafkaInputDStream类要继承ReceiverInputDStream,来实现ReceiverInputDStream中的getReceiver()函数,在getReceiver()函数中构造KafkaReceiver类对象
3.KafkaReceiver类是真正干活的类了,前边的一些工作都没啥实质工作,就是在扯皮,就跟工作中某些情况似的,项目中有很多人,一层层的领导们指挥规划任务,但具体干活的就是最底层的几个,不过还是要有这些工作的,这样整体脉络比较清晰。
a.设定kafka相关参数
b.设定存储kafka元数据的zookeeper的地址,连接zookeeper
c.设定kafka中数据的反序列化相关类
d.调用kafka消费者api来获取数据
e.创建线程池来
f.关闭线程池
SparkStreaming从kafak中接收数据的主要工作就是:
1.在Receiver中做:
a.消费消息队列中的数据,得到一条条数据。
b.调用Receiver中store函数将数据存储到Spark内存
2.将createStream、ReceiverInputDStream、KafkaInputDStream、KafkaReceiver、Receiver这些类的关系对应好。
具体逻辑分析:
1.spark官网KafkaWordCount示例:
object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }2.主要分析KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)中的createStream()函数:
/** * Create an input stream that pulls messages from a Kafka Broker. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) } 跟进createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel): def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) }函数最后返回了KafkaInputDStream对象,跟进KafkaInputDStream。
3.KafkaInputDStream类中代码
/** * Input stream that pulls messages from a Kafka Broker. * * @param kafkaParams Map of kafka configuration parameters. * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. */ private[streaming] class KafkaInputDStream[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], useReliableReceiver: Boolean, storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } } }该类主要有两个工作:
a.KafkaInputDStream类继承了ReceiverInputDStream[(K, V)](ssc_)。
b.实现了ReceiverInputDStream中的getReceiver()函数,getReceiver()返回两个Recceiver,原理一样查看KafkaReceiver即可。
4.查看构造的KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel):
private[streaming] class KafkaReceiver[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel ) extends Receiver[(K, V)](storageLevel) with Logging { // Connection to Kafka var consumerConnector: ConsumerConnector = null def onStop() { if (consumerConnector != null) { consumerConnector.shutdown() consumerConnector = null } } def onStart() { logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param => props.put(param._1, param._2)) val zkConnect = kafkaParams("zookeeper.connect") // Create the connection to the cluster logInfo("Connecting to Zookeeper: " + zkConnect) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig) logInfo("Connected to " + zkConnect) val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[K]] val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[V]] // Create threads for each topic/message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } } finally { executorPool.shutdown() // Just causes threads to terminate after work is done } // Handles Kafka messages private class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { def run() { logInfo("Starting MessageHandler.") try { val streamIterator = stream.iterator() while (streamIterator.hasNext()) { val msgAndMetadata = streamIterator.next() store((msgAndMetadata.key, msgAndMetadata.message)) //Store a single item of received data to Spark's memory. } } catch { case e: Throwable => logError("Error handling message; exiting", e) } } } }在构造的KafkaReceiver对象中做了最主要的工作。继承了Receiver[(K, V)](storageLevel),要实现Receiver中的onStart()、onStop()函数。
在onStart()函数中要做的工作就是把kafka中的数据放到kafka中。
a.设定kafka相关参数
b.设定存储kafka元数据的zookeeper的地址,连接zookeeper
c.设定kafka中数据的反序列化相关类
d.调用kafka消费者api来获取数据
e.创建线程池来将获取的流数据存储到spark,store((msgAndMetadata.key, msgAndMetadata.message))该函数在Receiver类中,就是把该条消息以键值对的形式存储到spark内存中,正因为这种键值存储导致调用 KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)时返回的是键值对的对象,之前用java写spark接收kafak数据调用这个端口时返回这个键值对的对象,我就对此有些疑问,现在明白是在这做的处理导致返回的是键值对对象。
f.关闭线程池
onStop()函数就是关闭消费者与kafka连接了
然后就一层层返回,最后createStream函数的返回对象中就可以得到数据了。
至此spark接收消费kafak数据的工作流程结束了。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Kafka 之 中级
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- js继承 Base类的源码解析
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- 搭建hadoop/spark集群环境
- 编译Spark 1.5.2
- RocketMQ Filtersrv详解
- Logstash 与Elasticsearch整合使用示例