您的位置:首页 > 其它

Spark之任务流程和角色

2017-04-29 16:38 357 查看

一、各个角色解释

Application 基于Spark的应用程序,包含了driver程序和 集群上的executor

Driver Program 运⾏行main函数并且新建SparkContext的 程序

Cluster Manager 在集群上获取资源的外部服务(例如 standalone,Mesos,Yarn )

Worker Node 集群中任何可以运⾏行应⽤用代码的节点

Executor是在⼀一个worker node上为某应⽤用启动的⼀一 个进程,该进程负责运⾏行任务,并且负责将数据存在内存或 者磁盘上。每个应⽤用都有各⾃自独⽴立的executor

Task 被送到某个executor上的工作单元



Job 包含很多任务的并行计算,可以看做和Spark的action 对应。一个Action操作产生一个Job

Stage⼀一个Job会被拆分很多组任务,每组任务被称为 Stage(就像Mapreduce分map任务和reduce任务⼀一样)。一个transformation操作划分一个Stage, stage的划分是通过宽依赖。在下图中,我们可以看到:



二、Spark任务调度器

2.1、调度器根据RDD的结构信息为每个Action操作确定有效的执行计划 。调度器的接口是runJob函数,参数为RDD及其分区集,和 一个RDD分区上的函数。该接口足以表示Spark中的所有动作 (即count、collect、save等)。

2.2、总的来说,我们的调度器跟Dryad类似,但我们还考虑了哪些 RDD分区是缓存在内存中的。调度器根据目标RDD的Lineage 图创建一个由 stage构成的有向无环路图(DAG)。每个 stage内部尽可能多地包含一组具有窄依赖关系的转换,并将 它们流水线并行化(pipeline)。



 stage的边界有两种情况:
  一是宽依赖上的Shuffle操作;
  二是已缓存分区,它可以缩短 父RDD的计算过程。
父RDD完成计算后,可以在 stage内启动一组任务计算丢失的分区。

三、任务调度, 下图为流程图



3.1、DAG Scheduler

• 基于Stage构建DAG,决定每个任务的最佳位置

• 记录哪个RDD或者Stage输出被物化

• 将taskset传给底层调度器TaskScheduler

• 重新提交shuffle输出丢失的stage

3.2、Task Scheduler

• 提交taskset(⼀一组task)到集群运⾏行并汇报结果

• 出现shuffle输出lost要报告fetch failed错误

• 碰到straggle任务需要放到别的节点上重试

• 为每一个TaskSet维护一个TaskSetManager(追踪本地性 及错误信息)

四、任务的调度



五、WordCount案例

我们在sparkshell中运行一下最简单的例子,在spark-shell中 输入如下代码
用于统计在README.md中含有Spark的行数有多少

scala>sc.textFile("README.md").filter(_.contains("Spark") ).count


scala> val lines = sc.textFile("abc.log")

scala> val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

scala> result.toDebugString
res4: String =
(1) ShuffledRDD[13] at reduceByKey at <console>:29 []
+-(1) MapPartitionsRDD[12] at map at <console>:29 []
|  MapPartitionsRDD[11] at flatMap at <console>:29 []
|  abc.log MapPartitionsRDD[7] at textFile at <console>:27 []
|  abc.log HadoopRDD[6] at textFile at <console>:27 []

scala>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark