Spark内核介绍:Spark在运行时会把Stage包装成任务提交 (二)
2016-03-17 19:22
483 查看
Spark在运行时会把Stage包装成任务提交,有父Stage的Spark会先提交父Stage。弄清楚了Spark划分计算的原理,我们再结合源码 看一看这其中的过程。下面的代码是DAGScheduler中的得到一个RDD父Stage的函数,可以看到宽依赖为划分Stage的边界。
上面提到Spark的计算是从RDD调用action操作时候触发的,我们来看一个action的代码
RDD的collect方法是一个action操作,作用是将RDD中的数据返回到一个数组中。可以看到,在此action中,会触发Spark上下文环境SparkContext中的runJob方法,这是一系列计算的起点。
SparkContext拥有DAGScheduler的实例,在runJob方法中会进一步调用DAGScheduler的runJob方法。在此 时,DAGScheduler会生成DAG和Stage,将Stage提交给TaskScheduler。TaskSchduler将Stage包装成 TaskSet,发送到Worker节点进行真正的计算,同时还要监测任务状态,重试失败和长时间无返回的任务。
我们可以通过RDD的toDebugString来查看其递归的依赖信息,图6展示了在spark shell中通过调用这个函数来查看wordCount RDD的依赖关系,也就是它的Lineage.
图6 RDD wordCount的lineage
如果发现Lineage过长或者里面有被多次重复使用的RDD,我们就可以考虑使用cache机制或checkpoint机制了。
我们可以通过在程序中直接调用RDD的cache方法将其保存在内存中,这样这个RDD就可以被多个任务共享,避免重复计算。另外,RDD还提供了 更为灵活的persist方法,可以指定存储级别。从源码中可以看到RDD.cache就是简单的调用了 RDD.persist(StorageLevel.MEMORY_ONLY)。
同样,我们可以调用RDD的checkpoint方法将其保存到磁盘。我们需要在SparkContext中设置checkpoint的目录,否则调用会 抛出异常。值得注意的是,在调用checkpoint之前建议先调用cache方法将RDD放入内存,否则将RDD保存到文件的时候需要重新计算。
Cache机制和checkpoint机制的差别在于cache将RDD保存到内存,并保留Lineage,如果缓存失效RDD还可以通过Lineage重建。而checkpoint将RDD落地到磁盘并切断Lineage,由文件系统保证其重建。
SparkContext的进程就成为了driver角色,上一节提到的DAGScheduler和TaskScheduler都在driver中运行。 Spark程序在提交时要指定master的地址,这样可以在程序启动时向master申请worker的计算资源。Driver,master和 worker之间的通信由Akka支持。Akka 也使用 Scala 编写,用于构建可容错的、高可伸缩性的Actor 模型应用。关于Akka,可以访问其官方网站进行进一步了解,本文不做详细介绍。
图7 Spark任务部署
上面提到Spark的计算是从RDD调用action操作时候触发的,我们来看一个action的代码
RDD的collect方法是一个action操作,作用是将RDD中的数据返回到一个数组中。可以看到,在此action中,会触发Spark上下文环境SparkContext中的runJob方法,这是一系列计算的起点。
SparkContext拥有DAGScheduler的实例,在runJob方法中会进一步调用DAGScheduler的runJob方法。在此 时,DAGScheduler会生成DAG和Stage,将Stage提交给TaskScheduler。TaskSchduler将Stage包装成 TaskSet,发送到Worker节点进行真正的计算,同时还要监测任务状态,重试失败和长时间无返回的任务。
2.3 RDD的缓存与容错
上文提到,Spark的计算是从action开始触发的,如果在action操作之前逻辑上很多transformation操作,一旦中间发生计 算失败,Spark会重新提交任务,这在很多场景中代价过大。还有一些场景,如有些迭代算法,计算的中间结果会被重复使用,重复计算同样增加计算时间和造 成资源浪费。因此,在提高计算效率和更好支持容错,Spark提供了基于RDDcache机制和checkpoint机制。我们可以通过RDD的toDebugString来查看其递归的依赖信息,图6展示了在spark shell中通过调用这个函数来查看wordCount RDD的依赖关系,也就是它的Lineage.
图6 RDD wordCount的lineage
如果发现Lineage过长或者里面有被多次重复使用的RDD,我们就可以考虑使用cache机制或checkpoint机制了。
我们可以通过在程序中直接调用RDD的cache方法将其保存在内存中,这样这个RDD就可以被多个任务共享,避免重复计算。另外,RDD还提供了 更为灵活的persist方法,可以指定存储级别。从源码中可以看到RDD.cache就是简单的调用了 RDD.persist(StorageLevel.MEMORY_ONLY)。
同样,我们可以调用RDD的checkpoint方法将其保存到磁盘。我们需要在SparkContext中设置checkpoint的目录,否则调用会 抛出异常。值得注意的是,在调用checkpoint之前建议先调用cache方法将RDD放入内存,否则将RDD保存到文件的时候需要重新计算。
Cache机制和checkpoint机制的差别在于cache将RDD保存到内存,并保留Lineage,如果缓存失效RDD还可以通过Lineage重建。而checkpoint将RDD落地到磁盘并切断Lineage,由文件系统保证其重建。
2.4 Spark任务的部署
Spark的集群部署分为Standalone、Mesos和Yarn三种模式,我们以Standalone模式为例,简单介绍Spark程序的部 署。如图7示,集群中的Spark程序运行时分为3种角色,driver, master和worker(slave)。在集群启动前,首先要配置master和worker节点。启动集群后,worker节点会向master节 点注册自己,master节点会维护worker节点的心跳。Spark程序都需要先创建Spark上下文环境,也就是SparkContext。创建SparkContext的进程就成为了driver角色,上一节提到的DAGScheduler和TaskScheduler都在driver中运行。 Spark程序在提交时要指定master的地址,这样可以在程序启动时向master申请worker的计算资源。Driver,master和 worker之间的通信由Akka支持。Akka 也使用 Scala 编写,用于构建可容错的、高可伸缩性的Actor 模型应用。关于Akka,可以访问其官方网站进行进一步了解,本文不做详细介绍。
图7 Spark任务部署
相关文章推荐
- LayoutInflater 填充器填充布局,布局属性失效问题
- PB中,在用grid形式的数据窗口中,跨列标题头的制作方法
- MyBatis3:SQL映射
- c++类的基础
- Linux查看物理CPU个数、核数、逻辑CPU个数
- Makefile 学习日记(五)——makefile中的函数
- C++ 实现比较版本号
- Spark内核讲解之弹性分布数据集(一)
- 100万并发连接服务器笔记之Java Netty处理1M连接会怎么样
- uva10825 (暴力)
- LeetCode minDepth of 2 bin tree
- 第二周问题汇总
- mengento 数据库模型
- 404、500、502等HTTP状态码介绍
- android 内存处理工具
- C++ STL stack/queue
- java当中this应该怎么理解?
- 好文记载
- 消息系统Kafka介绍
- 100万并发连接服务器笔记之Erlang完成1M并发连接目标