您的位置:首页 > 大数据

大数据IMF传奇行动绝密课程第26课:Spark Runtime内幕揭秘

2016-08-12 00:12 513 查看

Spark Runtime内幕揭秘

1、再论Spark集群部署

1)从Spark Runtime的角度来讲有五大核心对象:Master、Worker、Executor、Driver、CoarseGraindExecutorBackend



2)Spark在做分布式集群系统设计的时候,最大化功能独立,模块化封装具体独立的对象、强内聚松耦合

3)当Driver中的SparkContext初始化的时候会提交程序给Master,Master如果接受该程序在Spark中运行的话,就会为当前的程序分配AppID,同时会分配具体的计算资源,需要特别注意的是,Master是根据当前提交程序的配置信息来给集群中的Worker发指令分配具体的计算资源。但是Master发出指令后并不关心具体的资源是否已经分配,转过来说Master是发指令后就记录了分配的资源,以后客户端再次提交其它的程序的话就不能够使用该资源了。其弊端是可能会导致其它要提交的程序无法分配到本来应该可以分配到的计算资源;最重要的优势在Spark分布式系统功能弱耦合的基础上最快的运行系统。(否则如果Master要等到资源最终分配成功后才通知Driver的话,就会造成Driver阻塞,不能够最大化并行计算资源的使用率)。

需要补充说明的是:Spark在默认情况下由于集群中一般都只有一个Application在运行,所以Master分配资源策略的弊端就没有那么明显了。

2、Job提交过程源码解密

1)一个非常重要的技巧是通过在spark-shell中运行一个Job来了解Job提交的过程,然后在用源码验证这个过程。

sc.textFile(“/library/dataforSortedShuffle”).flatMap(.split(” “)).map(word => (word, 1)).reduceByKey(+_).

2)在Spark中所有的Action都会触发至少一个Job,在上述代码中是通过saveAsTextFile来触发Job的。

3)SparkContext在实例化的时候会构造SparkDeploySchedulerBackend、DAGScheduler、TaskSchedulerImpl、MapOutputtrackerMaster等对象。其中SparkDeploySchedulerBachend负责集群计算资源的管理和调度,DAGScheduler负责高层调度(例如Job中Stage的划分、数据本地行等内容),TaskSchedulerImpl负责具体Stage内部的底层调度(例如具体每个Task的调度、Task的容错等),MapOutputTrackerMaster负责Shuffle中数据输出和读取的管理;

4)TaskSchedulerImpl内部的调度:

16/08/07 23:28:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/08/07 23:28:01 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2123 bytes)

这里有Worker信息,Partition信息,等等

3、Task的运行解密

1)Task是运行在Executor中,而Executor又是位于CoarseGrainedExecutorBackend中的,且CoarseGrainedExecutorBackend和Executor是一一对应的

2)当CoarseGrainedExecutorBackend接收到TaskSetManager发过来的LaunchTask消息后会反序列化TaskDescription,然后使用CoarseGrainedExecutorBackend中唯一的Executor来执行任务;

case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}


补充说明:LaunchTask是 case class


// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐