您的位置:首页 > 其它

DAGScheduler 源码走读

2016-06-15 21:02 274 查看
 要理解DAGScheduler,首先就得了解RDD的生命周期。RDD是什么?且看它的全称 ResilientDistributedDatasets,弹性式分布数据集。没错,RDD是一种数据结构,这种数据结构自带了很多方法,这些方法可分为两种:transformation和action。在这两种操作中,只有action操作会触发job。且看常用的action操作有哪些:             所有直接实现的action操作都会触发job(注:有些算子是调用其它算子实现的,如first()算子是调用take()算子实现的),具体代码为:sc.runJob(this,func,....)          runJob()是SparkContext中的方法,而这个方法最终用调用了DAGScheduler中的runJob方法,具体代码为:
dagScheduler.runJob(rdd,cleanedFunc,partitions,callSite,resultHandler,localProperties.get)
DAGScheduler中的runJob()会调用submit():
submitJob(rdd,func,partitions,callSite,resultHandler,properties)
而submitJob()会把job提交到eventProcessLoop线程:
eventProcessLoop.post(JobSubmitted(jobId,rdd,func2,partitions.toArray,callSite,waiter,SerializationUtils.clone(properties)))
类DAGSchedulerEventProcessLoop有一个监听方法onReceive,这个方法会调用doOnReceive方法处理各种case,其代码如下:
defdoOnReceive(event:DAGSchedulerEvent):Unit=eventmatch{
caseJobSubmitted(...)=>
dagScheduler.handleJobSubmitted(...)
......//此处省略了其它无关代码
}
而handleJobSubmitted()方法,首先创建finalStage,最终调用submitStage方法提交创建好的finalStage,其主要代码如下:
defhandleJobSubmitted(...):{
//创建finalStage
finalStage=newResultStage(
finalRDD,func,partitions,jobId,callSite)
//创建job
valjob=newActiveJob(jobId,finalStage,callSite,listener,properties)
//创建好的job加到finalStage中
finalStage.setActiveJob(job)
//最后提交创建的finalStage
		submitStage(finalStage)
......//此处省略其它代码}
接下来看看submitStage方法做了哪些事情:
privatedefsubmitStage(stage:Stage){
valjobId=activeJobForStage(stage)
if(jobId.isDefined){
logDebug("submitStage("+stage+")")
if(!waitingStages(stage)&&!runningStages(stage)&&!failedStages(stage)){
valmissing=getMissingParentStages(stage).sortBy(_.id)//根据传入finalStage查看是否有遗漏的parentstage
logDebug("missing:"+missing)
if(missing.isEmpty){//如果所有parentstage都已经提交,则调用submitMissingTasks方法
logInfo("Submitting"+stage+"("+stage.rdd+"),whichhasnomissingparents")
submitMissingTasks(stage,jobId.get)
}else{
for(parent<-missing){
submitStage(parent)//如果有遗漏的parentstage,则递归调用submitStage方法,使得parentstage先被处理
}
waitingStages+=stage
}
}else{
}
abortStage(stage,"Noactivejobforstage"+stage.id,None)
}
}
需要注意的是在getMissingParentStages()方法中,会根据rdd的dependency做不同的处理,代码如下:
privatedefgetMissingParentStages(stage:Stage):List[Stage]={valmissing=newHashSet[Stage]valvisited=newHashSet[RDD[_]]valwaitingForVisit=newStack[RDD[_]]defvisit(rdd:RDD[_]){if(!visited(rdd)){visited+=rddvalrddHasUncachedPartitions=getCacheLocs(rdd).contains(Nil)if(rddHasUncachedPartitions){for(dep<-rdd.dependencies){depmatch{caseshufDep:ShuffleDependency[_,_,_]=>valmapStage=getShuffleMapStage(shufDep,stage.firstJobId)//如果是shuffledependency,则对相应的finalStage生成parentstageif(!mapStage.isAvailable){missing+=mapStage}casenarrowDep:NarrowDependency[_]=>waitingForVisit.push(narrowDep.rdd)//如果是窄依赖的话,则不生成新的stage}}}}}waitingForVisit.push(stage.rdd)while(waitingForVisit.nonEmpty){visit(waitingForVisit.pop())}missing.toList}
接下来看看submitMissingTasks做了哪些事情,其主要代码如下(省略了大部分代码,只列出主要代码):
privatedefsubmitMissingTasks(stage:Stage,jobId:Int){runningStages+=stagestagematch{cases:ShuffleMapStage=>outputCommitCoordinator.stageStart(stage=s.id,maxPartitionId=s.numPartitions-1)cases:ResultStage=>outputCommitCoordinator.stageStart(stage=s.id,maxPartitionId=s.rdd.partitions.length-1)}valtaskIdToLocations:Map[Int,Seq[TaskLocation]]=try{stagematch{cases:ShuffleMapStage=>partitionsToCompute.map{id=>(id,getPreferredLocs(stage.rdd,id))}.toMapcases:ResultStage=>valjob=s.activeJob.getpartitionsToCompute.map{id=>valp=s.partitions(id)(id,getPreferredLocs(stage.rdd,p))}.toMap}}catch{......}
//TODO:MaybewecankeepthetaskBinaryinStagetoavoidserializingitmultipletimes.//Broadcastedbinaryforthetask,usedtodispatchtaskstoexecutors.Notethatwebroadcast//theserializedcopyoftheRDDandforeachtaskwewilldeserializeit,whichmeanseach//taskgetsadifferentcopyoftheRDD.Thisprovidesstrongerisolationbetweentasksthat//mightmodifystateofobjectsreferencedintheirclosures.ThisisnecessaryinHadoop//wheretheJobConf/Configurationobjectisnotthread-safe.vartaskBinary:Broadcast[Array[Byte]]=nulltry{//ForShuffleMapTask,serializeandbroadcast(rdd,shuffleDep).//ForResultTask,serializeandbroadcast(rdd,func).valtaskBinaryBytes:Array[Byte]=stagematch{casestage:ShuffleMapStage=>closureSerializer.serialize((stage.rdd,stage.shuffleDep):AnyRef).array()casestage:ResultStage=>closureSerializer.serialize((stage.rdd,stage.func):AnyRef).array()}taskBinary=sc.broadcast(taskBinaryBytes)}catch{......}valtasks:Seq[Task[_]]=try{stagematch{casestage:ShuffleMapStage=>partitionsToCompute.map{id=>vallocs=taskIdToLocations(id)valpart=stage.rdd.partitions(id)newShuffleMapTask(stage.id,stage.latestInfo.attemptId,taskBinary,part,locs,stage.internalAccumulators)}casestage:ResultStage=>valjob=stage.activeJob.getpartitionsToCompute.map{id=>valp:Int=stage.partitions(id)valpart=stage.rdd.partitions(p)vallocs=taskIdToLocations(id)newResultTask(stage.id,stage.latestInfo.attemptId,taskBinary,part,locs,id,stage.internalAccumulators)}}}catch{......}if(tasks.size>0){stage.pendingPartitions++=tasks.map(_.partitionId)taskScheduler.submitTasks(newTaskSet(tasks.toArray,stage.id,stage.latestInfo.attemptId,jobId,properties))stage.latestInfo.submissionTime=Some(clock.getTimeMillis())}else{//BecausewepostedSparkListenerStageSubmittedearlier,weshouldmark//thestageascompletedhereincasetherearenotaskstorunmarkStageAsFinished(stage,None)}}
首先把运行的stage加入到runningStage中根据stage的类型,启动stage找到task对应的rdd的物理地址生成序列化的二进制task,然后广播到各个节点。每一个task都会得到一份binarytask,这样保证了各个task之间的独立性找到每一个task对应的rdd的物理地址根据stage的类型创建对应的task,如果是shuffleMapStage,则创建ShuffleMapTask;如果是resultStage,则创建ResultTask最后调用taskSchdulerImp的submitTask方法。这样job经过DAGScheduler处理之后,就交给taskScheduler了。

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: