Spark源码解析之SparkStreaming数据处理及流动
2017-09-21 15:14
417 查看
在分析receiver启动的博文中SparkStreaming中Receiver的启动,我们遇到ReceiverSupervisor中的onStart方法
我们先来回顾一下这个方法是如何被触发的:
StreamingContext#start
JobScheduler#start
ReceiverTracker#start
launchReceivers()
endpoint.send(StartAllReceivers(receivers)) //这里的endpoint是指ReceiverTrackerEndpoint
JobGenerator#start
ReceiverTracker接收到StartAllReceivers消息后,执行startReceiver方法
startReceiver(receiver, executors)
在这个方法中会将receiver与每一个executor封装成一个RDD,并为这个RDD提供一个startReceiverFunc方法,然后组装成一个job提交到executor上去执行,在这个startReceiverFunc方法中就调用了
至此我们就明白了这个start方法的调用是在receiver启动的时候就会调用的,当时我们略过了onStart方法,直接分析了startReceiver方法,下面我们看看ReceiverSupervisorImpl#onStart方法
registeredBlockGenerators其实就是BlockGenerator的数组
在初始化ReceiverSupervisorImpl的时候,就会创建一个BlockGenerator将其加入这个队列中,那么我们看看BlockGenerator类
我们先看一下它的start方法,在start方法里面主要就是启动了两个组件,一个是blockIntervalTimer,另一个是blockPushingThread
blockIntervalTimer是一个定时器,其主要的作用就是定时将currentBuffer中的原始数据打包成一个个的block,从其定义中就可以看出来调用的是updateCurrentBuffer这个方法,在这个方法中我们看到会先清除掉currentBuffer,创建一个新的currentBuffer,并使用其创建一个block,然后将这个block放到blocksForPushing中,这就说明了一个block中含有的是200ms内发送过来的所有的数据
blockPushingThread是一个线程,调用其start方法负责启动该线程,该线程的run方法中调用了keepPushingBlocks方法,在这个方法中每隔一段时间去blockForPushing队列中取block,取到block后调用pushBlock(block)方法,最后回调SupervisorReceiverImpl中的onPushBlock方法
pushArrayBuffer最终就会调用pushAndReportBlock方法将block写入BlockManager中,这边的源码详见上一篇博文
接下来我们看一下job是怎么提交的,我们注意到在初始化JobScheduler中也初始化了另一个对象JobGenerator,在JobGenerator中有一个定时器,定时执行
每隔一段时间就会post一个GeneratorJobs消息,我们看看接收到GeneratorJobs消息后的处理:
会调用generateJobs方法
def start() { onStart() startReceiver() }
我们先来回顾一下这个方法是如何被触发的:
StreamingContext#start
JobScheduler#start
ReceiverTracker#start
launchReceivers()
endpoint.send(StartAllReceivers(receivers)) //这里的endpoint是指ReceiverTrackerEndpoint
JobGenerator#start
ReceiverTracker接收到StartAllReceivers消息后,执行startReceiver方法
startReceiver(receiver, executors)
在这个方法中会将receiver与每一个executor封装成一个RDD,并为这个RDD提供一个startReceiverFunc方法,然后组装成一个job提交到executor上去执行,在这个startReceiverFunc方法中就调用了
supervisor.start()方法
至此我们就明白了这个start方法的调用是在receiver启动的时候就会调用的,当时我们略过了onStart方法,直接分析了startReceiver方法,下面我们看看ReceiverSupervisorImpl#onStart方法
override protected def onStart() { //这里会启动BlockGenerator registeredBlockGenerators.foreach { _.start() } }
registeredBlockGenerators其实就是BlockGenerator的数组
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
在初始化ReceiverSupervisorImpl的时候,就会创建一个BlockGenerator将其加入这个队列中,那么我们看看BlockGenerator类
private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock() ) extends RateLimiter(conf) with Logging { //blockInterval的默认值是200ms private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value") //相当于每隔200ms就调用一个updateCurrentBuffer函数 private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) //这里blocksForPushing的长度都是可以调节的 //默认的blockQueueSize是10个,可以调大也可以调小 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) //后台线程,启动之后就会调用keepPushingBlocks方法,这个方法中就会每隔一段时间去blockForPushing队列中取block private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } //currentBuffer就是用来存放原始的数据 @volatile private var currentBuffer = new ArrayBuffer[Any] @volatile private var state = Initialized def start(): Unit = synchronized { if (state == Initialized) { state = Active //BlockGenerator的start方法其实就是启动内部的两个关键的后台线程 //一个是blockIntervalTimer,负责将currentBuffer中的原始数据打包成一个一个的block // 另一个是blockPushingThread,负责将blocksForPushing中的block调用pushArrayBuffer blockIntervalTimer.start() blockPushingThread.start() logInfo("Started BlockGenerator") } else { throw new SparkException( s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]") } } private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { //直接清空currentBuffer,创建一个新的currentBuffer if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] //创建一个唯一的blockId,根据时间创建的 val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) //创建一个block newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { //将block放入blocksForPushing队列中 blocksForPushing.put(newBlock) // put is blocking when queue is full } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } } /** Keep pushing blocks to the BlockManager. */ private def keepPushingBlocks() { logInfo("Started block pushing thread") def areBlocksBeingGenerated: Boolean = synchronized { state != StoppedGeneratingBlocks } try { // While blocks are being generated, keep polling for to-be-pushed blocks and push them. while (areBlocksBeingGenerated) { //从blocksForPushing这个队列中poll出来了当前队列队首的block,对于阻塞队列默认设置了100ms的超时 Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { //拿到了block,调用pushBlock去推送block case Some(block) => pushBlock(block) case None => } } // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { val block = blocksForPushing.take() logDebug(s"Pushing block $block") pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } } private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) } }
我们先看一下它的start方法,在start方法里面主要就是启动了两个组件,一个是blockIntervalTimer,另一个是blockPushingThread
blockIntervalTimer是一个定时器,其主要的作用就是定时将currentBuffer中的原始数据打包成一个个的block,从其定义中就可以看出来调用的是updateCurrentBuffer这个方法,在这个方法中我们看到会先清除掉currentBuffer,创建一个新的currentBuffer,并使用其创建一个block,然后将这个block放到blocksForPushing中,这就说明了一个block中含有的是200ms内发送过来的所有的数据
blockPushingThread是一个线程,调用其start方法负责启动该线程,该线程的run方法中调用了keepPushingBlocks方法,在这个方法中每隔一段时间去blockForPushing队列中取block,取到block后调用pushBlock(block)方法,最后回调SupervisorReceiverImpl中的onPushBlock方法
private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { //调用pushArrayBuffer去推送block pushArrayBuffer(arrayBuffer, None, Some(blockId)) } }
pushArrayBuffer最终就会调用pushAndReportBlock方法将block写入BlockManager中,这边的源码详见上一篇博文
接下来我们看一下job是怎么提交的,我们注意到在初始化JobScheduler中也初始化了另一个对象JobGenerator,在JobGenerator中有一个定时器,定时执行
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
每隔一段时间就会post一个GeneratorJobs消息,我们看看接收到GeneratorJobs消息后的处理:
case GenerateJobs(time) => generateJobs(time)
会调用generateJobs方法
//定时调度jeneratorJobs方法 //传入一个Timer时间,其实就是一个batchinterval内的时间段 private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try { //首先找到receiverTracker,调用其allocateBlocksToBatch方法 //将当前的时间段内的block分配给一个batch并为其创建一个RDD jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //调用DStreamGraph的generatorJobs方法,来根据我们定义的DStream之间的依赖关系和算子生成job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => //从 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) //用JobScheduler提交Job,其对应的原始数据是那一批block jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
相关文章推荐
- Spark streaming源码分析之Job动态生成原理与源码解析
- Spark Streaming工作流程源码解析
- <转>Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
- Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
- SparkStreaming的运行流程解析(源码)
- SparkStreaming的运行流程解析(源码)
- Spark streaming技术内幕6 : Job动态生成原理与源码解析
- Spark streaming技术内幕6 : Job动态生成原理与源码解析
- 基于HDFS的SparkStreaming案例实战和内幕源码解析
- Spark源码解析之SparkStreaming中Receiver的启动
- Spark-streaming-2.0-Kafka数据接收并行度源码解析
- Spark源码解析之小分区合并
- 第13课:Spark Streaming源码解读之Driver容错安全性
- Spark源码解析之textFile
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- 52:Spark中的新解析引擎Catalyst源码中的外部数据源、缓存及其它
- 第14课:Spark Streaming源码解读之State管理之updateState