您的位置:首页 > 其它

第37课:Kafka源码解读Consumer内幕解密

2016-07-03 17:04 459 查看
第37课:Kafka源码解读Consumer内幕解密

contributor:spark 2.0 bug 修复

Consumer :while 循环,线程向broker list主动抓数据,线程不断的看有没有数据。不断向leader询问:有数据吗? 一个线程消费一个partition的数据

 

设想场景:

broker(3)- topic (1)- partition(10)

获取数据,被zookeeper管理,向zookeeper请求关注的topic及partition,zookeeper根据leader

、follower的信息,将关注的topic及partition给你,每一个partition,产生一个线程取抓取,

然后不断循环,每一个partition可能会变,10个partition就有10个线程,抓取数据以后,放入

Consumer 的一个阻塞队列,10个线程不断抓到数据 一直poll,Consumer 用一个线程从阻塞队列

中拿数据,一个record,2个record,。。。。抓数据是并行的;而拿数据需要一个一个线程去拿;

 

private[kafka] object ZookeeperConsumerConnector {
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
}

 

 

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")

val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)

val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList

val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)

loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: