您的位置:首页 > 其它

Spark on Yarn客户端作业提交过程分析

2016-04-20 11:13 344 查看

Spark on Yarn 客户端模式作业提交过程分析

https://www.zybuluo.com/rickyChen/note/312098

我们将以一个Spark Streaming为例,阅读spark相关源码,简述Spark on Yarn客户端模式下作业提交流程。作业是通过spark-submit脚本提交的,因此整个流程从spark-submit代码开始分析。若有错误,希望各位看官指出。

通过
submit
获取提交代码的
MainClass


通过反射机制
Utils.classForName
创建相关的类,并获取其中的
mainMethod


通过反射调用直接调用上一步获得的
mainMethod
,开始运行作业的main方法

首先,新建一个
SparkConf
类,其中封装了Spark和Application相关配置信息。

SparkConf
和批处理间隔做给参数创建一个
StreamingContext


在对
StreamingContext
初始化的过程中,调用构造器,新建一个
SparkContext
类.新建
SparkContext
的过程中,有以下步骤需要关注(以下步骤按顺序执行):

JobProgressListener

作业流程监听器,可以获取整个Application运行流程中每个Stage、Job的具体信息。追踪task级别的信息,用作在UI上的展示。

createSparkEnv

SparkConf
listenerBus
为参数调用
createSparkEnv
函数。其中,
listenerBus
是spark中的监听器,包括
JobProgressListener
。在
createSparkEnv
调用的过程中,将调用
SparkEnv
对象的
createDriverEnv
成员函数,在这个过程中会创建一个
actorSystem
和一个
rpcEnv
,生成一个
driver
,这将创建一个
SparkEnv
对象,
SparkEnv
对象中将封装诸如
rpcEnv
actorSystem
cacheManager
mapOutputTracker
shuffleManager
broadcastManager
blockManager
memoryManager
等成员类,成员类的作用如下:

mapOutputTracker

跟踪一个stage map、output的位置。获取map、output的信息。driver、executor使用不同的HashMap存储元数据。

shuffleManager

shuffleManager
会在driver和每个executor中创建,我们可以通过
spark.shuffle.manager
来对shuffle进行配置,executor可以同过
shuffleManager
接口读写数据。

broadcastManager

广播变量管理器

blockManager

外部类与storage模块打交道都要通过调用
BlockManager
相应接口实现

memoryManager

内存管理器,协调运行内存和存储内存,其中运行资源负责shuffles、 joins、sorts和aggregations,存储内存负责caching和扩散。每一个executor都有一个memoryManager

SparkStatusTracker

低级API,
SparkStatusTracker
类方法将调用
JobProgressListener
类中的成员变量,
SparkStatusTracker
可以获得Application中Stage、Job的具体信息,但只提供最近几个Jobs/Stages信息。

HeartbeatReceiver

运行在
driver
上的一个类,负责接受来自
executor
的心跳信息。

createTaskScheduler

调用
createTaskScheduler
,返回两个对象:
_schedulerBackend
_taskScheduler
,并创建
_dagScheduler
DAGScheduler
初始化完成之后,将调用
_taskScheduler.start()
,这一步主要进行了:

新建一个
ClientArguements
类,封装一些Application中需要的资源相关的配置信息。

ClientArguements
为参数,新建一个
Client


调用
Client.submitApplication


调用hadoop-yarn接口初始化
yarnClient
,从集群上申请一个Application,获取Application id,判断集群是否有足够资源,否则中断。向yarn集群申请一个Container运行
ApplicationMaster
,最后把整个Application提交到Yarn集群上运行。

Reference

https://github.com/apache/spark/tree/branch-1.4

http://spark.apache.org/docs/1.4.1/running-on-yarn.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: