您的位置:首页 > 其它

Spark集群Job,Task 的具体运行原理

2016-07-14 21:54 495 查看
一:Spark集群部署

二:Job提交解密

三:Job生成和接受

四:Task的运行

五:再论shuffle

1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , CoarseGrainedExecutorbacked ;

2,Spark 在做分布式集群系统的设计的时候,最大化功能的独立,模块化封装具体的独立的对象,强内聚低耦合   (耦合性也称块间联系,指软件系统结构中各模块间相互联系紧密程度的一种度量。模块之间联系越紧密,其耦合性就越强,模块的独立性则越差。模块间耦合高低取决于模块间接口的复杂性、调用的方式及传递的信息。内聚性又称块内联系。指模块的功能强度的度量,即一个模块内部各个元素彼此结合的紧密程度的度量。若一个模块内各元素(语名之间、程序段之间)联系的越紧密,则它的内聚性就越高。)



3,当Driver中的sparkContext 初始化的时候会提交程序给Master,Master如果接受该程序在spark中运行的话,就会为当前程序分配AppID ,同时分配计算资源,需要特备注意的是: Master是根据当前程序的配置信息来给集群中的Worker发指令来分配具体的计算资源。但是,Master发指令后并不关心具体的计算资源是否已经分配,转过来说,Master发出指令后就记录了分配的资源,以后客户端再次提交其他程序的话就不能使用该资源啦,其弊端是可能会导致其他要提交的程序无法分配到本来应该可以分配到的计算资源。最终优势在spark分布式系统功能弱耦合的基础上最快的运行系统(否则如果Master要等到计算资源最终分配成功后才通知Driver的话,会造成Driver的阻塞,不能够最大化的并行计算资源的利用率)  (低耦合 : 不关心指令发送成功还是失败)  (快是对Driver 而言)

补充说明的是: Spark默认程序是排队的,Spark默认的情况下由于集群中一般都只有一个Application在运行,所有Master分配计算资源策略就没有那么明显啦)

二 : Job提交过程 源码解密

1,一个非常重要的技巧通过在Spark-shell 中运行一个Job来了解Job提交的过程,然后再次用源码验证。

这个过程  : 

  sc.textFile("library/data1").flatMap(_.split("")).map(word => (word,1)).reduceByKey(_+_)saveAsTextFile("/library/data2")

2,在Spark中所有的Action都会触发一个至少一个Job,在上述代码中通过savaAsTextFile来触发Job的

3.SparkContext 在实例化的时候会构造SparkDeployShedulerBackend(deploy : 配置,部署),DAGScheduler,TaskShedulerImpl(Impl : 接口),MapOutputTrackerMaster(Tracker : 追踪)等对象:

(1)SparkDeploySchedulerBackend负责集群计算资源的管理和调度。

(2)DAGScheduler : 负责高层调度(例如: Job中stage的划分,数据本地性等内容)

(3)TaskShedulerImpl : 负责具体stage内部的底层调度(例如: 每个Task的调度 ,Task容错等等)

(4)MapOutputTrackerMaster: 负责shuffle中数据的输出和读取的管理。

4,TaskSchedulerImpl内部的调度:



三:Task 的运行解密:

1,Task运行在Executor中,而Executor又是位于CoarseGrainedExecutorBackend中的且CoarseGrainedExecutorBackend和Executor是一一对应的:

2,单CoarseGrainedExecutorBackend接受到TaskSetManager发过来的LaunchTask的消息后会反序列化TaskDescription,然后使用CoarseGrainedExecutorBackend中唯一的Executor来执行任务

case LaunchTask(data) =>

if (executor == null) {

logError(“Received LaunchTask command but executor was null”)

System.exit(1)

} else {

val taskDesc = ser.deserializeTaskDescription

logInfo(“Got assigned task ” + taskDesc.taskId)

executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,

taskDesc.name, taskDesc.serializedTask)

}

发消息要么是case class 或者 case object(是唯一的)每次生成类的事例

本博客内容来自于 : 简介: 王家林:DT大数据梦工厂创始人和首席专家. 联系邮箱18610086859@126.com 电话:18610086859 QQ:1740415547 微信号:18610086859
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: