您的位置:首页 > 其它

spark2.2.0源码学习过程记录:Day5

2017-09-02 16:07 357 查看
1、读《apache spark 源码剖析》第四章第2.3节任务的创建和分发

2、源码学习

类DAGScheduler
首先看DAGScheduler类的submitMissingTasks方法,
方法中首先取得了需要计算的partitions

然后根据stage的类型(ShuffleMapStage或ResultStage)使用outputCommitCoordinator的stageStart方法。(具体是啥没研究)

然后取得taskIdToLocations,即数据的存放位置

为stage创建一个attemptid,作为已经启动的标志

为stage创建广播变量

根据partitions数量和stage类型创建tasks集合

创建任务集TaskSet并调用taskScheduler.submitTasks方法提交任务集

没找到具体的实现类,返回之前的SparkContext类的代码,发现是从这个方法(SparkContext.createTaskScheduler)创建的,看里面的代码,根据不同的master种类创建了不同的taskScheduler,我们和书中一致,来看TaskSchedulerImpl类

类TaskSchedulerImpl
通读一遍,然后看submitTasks方法
这个方中首先创建TaskSetManager类用来管理任务
调用backend.reviveOffers方法,这个backend需要再回到SparkContex勒种去确认,在这使用StandaloneSchedulerBackend

TaskSetManager
Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
each task, retries tasks if they fail (up to a limited number of times), and
handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).

类StandaloneSchedulerBackend
不在这个类里,看他的父类
类CoarseGrainedSchedulerBackend
其中DriverEndpoint的receive方法,在里面调用了makeOffers方法
方法中调用scheduler.resourceOffers分配资源
然后调用launchTasks
最终调用executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))方法将任务发送出
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: