您的位置:首页 > 运维架构 > 网站架构

第11课:Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究

2016-05-19 11:07 603 查看
上节课将到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。
一、 ReceiverTracker主要的功能:
在Executor上启动Receivers。

停止Receivers 。

更新Receiver接收数据的速率(也就是限流)

不断的等待Receivers的运行状态,只要Receivers停止运行,就重新启动Receiver。也就是Receiver的容错功能。

接受Receiver的注册。

借助ReceivedBlockTracker来管理Receiver接收数据的元数据。

汇报Receiver发送过来的错误信息

ReceiverTracker 管理了一个消息通讯体ReceiverTrackerEndpoint,用来与Receiver或者ReceiverTracker 进行消息通信。
在ReceiverTracker的start方法中,实例化了ReceiverTrackerEndpoint,并且在Executor上启动Receivers:
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}

if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将Receiver封装成RDD以job的方式提交给集群运行。
endpoint.send(StartAllReceivers(receivers))
这里的endpoint就是ReceiverTrackerEndpoint的引用。

Receiver启动后,会向ReceiverTracker注册,注册成功才算正式启动了。
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
当Receiver端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker:

/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}


当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据:
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}
数据的元数据是交由ReceivedBlockTracker管理的。
数据最终被写入到streamIdToUnallocatedBlockQueues中:一个流对应一个数据块信息的队列。
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]


每当Streaming 触发job时,会将队列中的数据分配成一个batch,并将数据写入timeToAllocatedBlocks数据结构。
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
....
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}
可见一个batch会包含多个流的数据。

每当Streaming 的一个job运行完毕后:
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
...
JobScheduler会调用handleJobCompletion方法,最终会触发
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)


这里的maxRememberDuration是DStream中每个时刻生成的RDD保留的最长时间。
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}
而最后
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
这个代码会调用
case batchCompleted: StreamingListenerBatchCompleted =>
listener.onBatchCompleted(batchCompleted)

... 一路跟着下去...

/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
发送调整速率的消息给Receiver,Receiver接到消息后,最终通过BlockGenerator来调整数据的写入的时间,而控制数据流的速率。
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
bg.updateRate(eps)
}


备注:1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Tracker Receiver