您的位置:首页 > 其它

Spark源码阅读笔记:DriverProgram初始化

2014-10-17 14:35 399 查看
作为代码阅读的入口,先写个最最简单的DriverProgram初始化代码作为入口,如下:

val conf= newSparkConf().setAppName("SparkTest")
val sc= newSparkContext(conf)


SparkConf

先简单描述一下SparkConf,重点还是在SparkContext,以下描述为源码描述的翻译。
SparkConf描述了sparkapplication的配置信息,用来以一系列键值对的形式来配置Spark。大多数情况下,可以以‘newSparkConf()’的方式来创建一个SparkConf对象,在new的过程中,这个对象也会将系统启动时java
properties 中以'spark'开头的配置信息加载进来。在这种情况下,通过直接操作SparkConf对象进行配置会覆盖掉系统默认配置。
如果不想自动加载系统默认信息,可以通过newSparkConf(false)来关掉这个步骤。SparkConf对象中的所有set操作都支持链式调用(比如newSparkConf().setMaster("local").setAppName("My
app")
,因为所有set方法的返回值还是个SparkConf对象)。
需要注意一点,当SparkConf对象被传递到Spark系统中时,这个对象会被克隆出去且再也不支持修改了。

SparkContext

稍微了解过一点Spark即可发现,SparkContext是整个Spark应用开发中最重要的对象之一,同时也是Spark系统中最主要的入口。一个SparkContext代表了与Spark集群的连接,可以用来在集群上创建RDD、Spark累加器(accumulators)、广播变量(broadcastvariables)。
以开头的代码为例,这里传了一个SparkConf对象进去,当然也可以利用它的其他构造函数,这里不一一列举,但目的都是为了传初始化配置进去(PS:我在进行测试的时候源码放在本地编写,在SparkConf或SparkContext构造参数中设一下master,比如spark://...或mesos://...等等,就能直接运行在集群上啦,还是很方便的)。在一系列配置相关的检查和初始化完毕后,这里主要提一下几个比较重要的成员:
1.一个LiveListenerBus线程,用于监听和接收spark事件,具体流程在以后的文档中介绍,这里先跳过。
2.一个SparkEnv对象:SparkEnv是一个十分重要的类,它为一个Spark运行实例(指master或worker)管理所有的运行时环境对象(runtimeenvrionment
objects),其中Serializer、Akkaactor系统、blockmanager、mapoutput
tracker等。其实通过它的构造器参数就能看得很明白了(与此同时也是深入了解这些子系统的好入口):
classSparkEnv (
val executorId:String,
val actorSystem:ActorSystem,
val serializer:Serializer,
val closureSerializer:Serializer,
val cacheManager:CacheManager,
val mapOutputTracker:MapOutputTracker,
val shuffleManager:ShuffleManager,
val broadcastManager:BroadcastManager,
val blockManager:BlockManager,
val connectionManager:ConnectionManager,
val securityManager:SecurityManager,
val httpFileServer:HttpFileServer,
val sparkFilesDir:String,
val metricsSystem:MetricsSystem,
val shuffleMemoryManager:ShuffleMemoryManager,
val conf:SparkConf) extendsLogging

本篇主要扯一下总的初始化流程,以后有机会会一一进行深入分析的。
在SparkContext初始化阶段,通过调用SparkEnv的create工厂方法来创建SparkEnv对象,这个create方法依照构造器所需要的参数一一构造这些子系统。
3.persistentRdds,一个用于跟踪所有持久化过的RDD的hashmap,只是这hashmap有点特殊,是org.apache.spark.util.TimeStampedWeakValueHashMap,具体特性可以看源码注释。
4.一个MetaDataCleaner线程,周期性地清理元数据。
5.一个SparkUI对象,默认4040端口访问,可以用来查看任务运行情况。注意这里的ip是driver所在的机器的Ip,不是master的ip。
6.TaskSchedular,spark任务调度器。调用action方法后,任务以一个job的形式被提交上去,经DAGSchedular计算划分stage后,形成一个个stage,实际上就是一个task集合,而这些task集合会被DAGSchedular提交给TaskSchedular。此后TaskSchedular会向ClusterManager(standalone模式的master/Yarn/Mesos)要求在各个节点上提供资源。
7.HeatbeatReceiver,用来接收executor发过来的心跳消息。
8.DAGSchedular,这是一个stage层面的调度器,每次调用一个action时,即产生一个job,DAGSchedular就会将其转换为一个由stage组成的有向无环图,并且会追踪那些被物化的rdd和stage输出,将调度最小化。然后,DAGSchedular会将stage以taskset的形式提交到TaskSchedular。除了计算stage的有向无环图以外,DAGSchedular也会根据当前的cache状态(即cache数据的位置)来决定每个task优先运行的节点,并同样会把这些信息提交给TaskSchedular。除此之外,DAGSchedular也会处理由于shuffle动作输出数据丢失导致的异常(shuffle是划分stage的标志之一,如果stage中的任务由于上一个stage的shuffle结果数据丢失导致失败,按照spark以lineage做容错的手段,之前的stage会被重做以重新产生数据来完成当前的stage,这一流程也由DAGSchedular来控制,这属于发生在stage之间的错误)。但是如果是stage内部的任务发生错误而不是由于shuffle动作导致的异常,这就会交给TaskSchedular来管理,TaskSchedular会进行重试。

整个初始化过程说白了就是这些核心成员的初始化过程,本文只是很粗略地分析了一下Driver端的启动流程介绍了一下SparkContext的关键成员。个人认为Spark的API设计得真心很干净,非常清楚,并且注释十分详尽,为像我这样的新手学习提供了很大的方便。那么在之后的文档中将以SparkContext的几个核心成员来一步一步深入Spark的核心机制。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: