您的位置:首页 > 其它

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

2016-05-15 12:42 531 查看
本期内容:1、Spark Streaming Job生成深度思考2、Spark Streaming Job生成源码解析本节课主要是针对Job如何产生进行阐述在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler:
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
JobScheduler有几个非常重要的成员:JobGeneratorReceiverTrackerListenerBusJobScheduler将每个batch的RDD DAG的具体生成工作委托给JobGenerator,将源数据输入的记录工作委托给ReceiverTracker 。在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop;RecurringTimer它控制了job的触发,每到batchInterval时间,就往EventLoop的队列中放入一个消息。而EventLoop则不断的查看消息队列,一旦有消息就处理;在Spark Streaming应用程序中都会调用ssc.start() // ssc代表StreamingContext在调用start方法时会隐含调用一系列的模块启动:主要为receiverTracker.start()jobGenerator.start()ListenerBus.start()代码如下:
def start(): Unit= synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit= processEvent(event)

override protected def onError(e: Throwable): Unit= reportError("Error in job scheduler", e)
}
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
}ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
我们来具体的看看JobGenerator.start()的代码:
def
start()
:
Unit
=
synchronized {
...
eventLoop.start()
//启动RPC处理线程
if
(ssc.isCheckpointPresent) {
restart() 
// 如果不是第一次启动,就从Checkpoint中恢复
}
else
{
startFirstTime()
//第一次启动
}
}
在startFirstTime中将DStreamGraph、定时器启动
/** Starts the generator for the first time */private def startFirstTime(){val startTime = new Time(timer.getStartTime())graph.start(startTime - graph.batchDuration)timer.start(startTime.milliseconds)logInfo("Started JobGenerator at " + startTime)}
定时器RecurringTimer启动后,使用线程每到一个新的batchInterval,就会向EventLoop中发生一个消息
private val thread = new Thread("RecurringTimer - " + name) {setDaemon(true) //后台执行线程override def run() { loop }}
/*** Repeatedly call the callback every interval.*/private def loop() {try {while (!stopped) {triggerActionForNextInterval()}triggerActionForNextInterval()}catch {case e: InterruptedException =>}}
private def triggerActionForNextInterval(): Unit= {clock.waitTillTime(nextTime)callback(nextTime) //这里的callback就是前面传的匿名函数prevTime = nextTimenextTime += periodlogDebug("Callback for " + name + " called at time " + prevTime)}
这里的callback函数就是RecurringTimer初始化时传入的匿名函数:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
当EventLoop收到消息后:
override def run(): Unit= {try {while (!stopped.get) {val event = eventQueue.take() //队列中循环接收事件try {onReceive(event)}catch {case NonFatal(e) => {try {onError(e)}catch {case NonFatal(e) => logError("Unexpected error in " + name, e)}}}}}catch {case ie: InterruptedException => // exit even if eventQueue is not emptycase NonFatal(e) => logError("Unexpected error in " + name, e)}}
不断的去处理事件:
/** Processes all events */private def processEvent(event: JobGeneratorEvent) {logDebug("Got event " + event)event match {case GenerateJobs(time) => generateJobs(time)case ClearMetadata(time) => clearMetadata(time)case DoCheckpoint(time, clearCheckpointDataLater) =>doCheckpoint(time, clearCheckpointDataLater)case ClearCheckpointData(time) => clearCheckpointData(time)}}
这里调用generateJobs方法:
/** Generate jobs and perform checkpoint for the given `time`.*/private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment// Example: BlockRDDs are created in this thread, and it needs to access BlockManager// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block}match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>jobScheduler.reportError("Error generating jobs for time " + time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}
这段代码包含了JobGenerator的几个工作步骤:
要求ReceiverTracker将目前已收到的数据进行一次allocate,即将上次batch切分后的数据切分到到本次新的batch里
要求DStreamGraph复制出一套新的 RDD DAG 的实例。整个DStreamGraph.generateJobs(time)遍历结束的返回值是Seq[Job]
将第2步生成的本 batch 的 RDD DAG,和第1步获取到的 meta 信息,一同提交给JobScheduler异步执行这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的meta信息。这三者包装为一个JobSet,然后调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler。这里的向JobScheduler提交过程与JobScheduler接下来在jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回。
只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个checkpoint这里做checkpoint也只是异步提交一个DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的JobSet等实际运行时信息。
以下是Job调度整体流程图:
参考博客:http://lqding.blog.51cto.com/9123978/1772958
备注:
资料来源于:DT_大数据梦工厂
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息