您的位置:首页 > 其它

Spark集成Kafka源码分析——SparkStreaming从kafak中接收数据

2015-12-18 17:33 513 查看
整体概括:
要实现SparkStreaming从kafak中接收数据分为以下几步(其中涉及的类在包org.apache.spark.streaming.kafka中):

1.创建createStream()函数,返回类型为ReceiverInputDStream对象,在createStream()函数中最后返回构造的KafkaInputDStream类对象

2.KafkaInputDStream类要继承ReceiverInputDStream,来实现ReceiverInputDStream中的getReceiver()函数,在getReceiver()函数中构造KafkaReceiver类对象

3.KafkaReceiver类是真正干活的类了,前边的一些工作都没啥实质工作,就是在扯皮,就跟工作中某些情况似的,项目中有很多人,一层层的领导们指挥规划任务,但具体干活的就是最底层的几个,不过还是要有这些工作的,这样整体脉络比较清晰。

  a.设定kafka相关参数

  b.设定存储kafka元数据的zookeeper的地址,连接zookeeper

  c.设定kafka中数据的反序列化相关类

  d.调用kafka消费者api来获取数据

  e.创建线程池来

  f.关闭线程池

SparkStreaming从kafak中接收数据的主要工作就是:

1.在Receiver中做:

  a.消费消息队列中的数据,得到一条条数据。

  b.调用Receiver中store函数将数据存储到Spark内存

2.将createStream、ReceiverInputDStream、KafkaInputDStream、KafkaReceiver、Receiver这些类的关系对应好。

具体逻辑分析:

1.spark官网KafkaWordCount示例:

object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}
2.主要分析KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)中的createStream()函数:

/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param ssc       StreamingContext object
* @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId   The group id for this consumer
* @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
*                  in its own thread
* @param storageLevel  Storage level to use for storing the received objects
*                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}
跟进createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel):
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
函数最后返回了KafkaInputDStream对象,跟进KafkaInputDStream。

3.KafkaInputDStream类中代码

/**
* Input stream that pulls messages from a Kafka Broker.
*
* @param kafkaParams Map of kafka configuration parameters.
*                    See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*/
private[streaming]
class KafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
}
该类主要有两个工作:

a.KafkaInputDStream类继承了ReceiverInputDStream[(K, V)](ssc_)。

b.实现了ReceiverInputDStream中的getReceiver()函数,getReceiver()返回两个Recceiver,原理一样查看KafkaReceiver即可。

4.查看构造的KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel):

private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[(K, V)](storageLevel) with Logging {

// Connection to Kafka
var consumerConnector: ConsumerConnector = null

def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}
}

def onStart() {

logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))

// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))

val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

// Create threads for each topic/message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
// Handles Kafka messages
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message)) //Store a single item of received data to Spark's memory.
}
} catch {
case e: Throwable => logError("Error handling message; exiting", e)
}
}
}
}
在构造的KafkaReceiver对象中做了最主要的工作。继承了Receiver[(K, V)](storageLevel),要实现Receiver中的onStart()、onStop()函数。

在onStart()函数中要做的工作就是把kafka中的数据放到kafka中。

a.设定kafka相关参数

b.设定存储kafka元数据的zookeeper的地址,连接zookeeper

c.设定kafka中数据的反序列化相关类

d.调用kafka消费者api来获取数据

e.创建线程池来将获取的流数据存储到spark,store((msgAndMetadata.key, msgAndMetadata.message))该函数在Receiver类中,就是把该条消息以键值对的形式存储到spark内存中,正因为这种键值存储导致调用 KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)时返回的是键值对的对象,之前用java写spark接收kafak数据调用这个端口时返回这个键值对的对象,我就对此有些疑问,现在明白是在这做的处理导致返回的是键值对对象。

f.关闭线程池

onStop()函数就是关闭消费者与kafka连接了

然后就一层层返回,最后createStream函数的返回对象中就可以得到数据了。

至此spark接收消费kafak数据的工作流程结束了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息