您的位置:首页 > 其它

Spark体系概况

2017-09-25 14:31 155 查看

一、Spark来源

Spark依托于Hadoop,原有的Hadoop在刚发布是非常完善的,但随着对Hadoop的深入使用,发现存在许多问题。Hadoop最早的版本是MRv1版本,任务的提交和调度如下。



可知Hadoop主要分成3个主要部分,HDFS提供数据源,JobTrack负责任务的资源分配管理以及任务的调度,TaskTrack主要负责任务的执行。当集群任务较多,JobTrack会出现负载过重,容易引起一系列问题,JobTrack不仅承担资源分配管理,还要对任务进行调度,容易引发单点故障,没有备份的节点。问题具体表现在下面:

可扩展性差:在运行时,JobTracker既负责资源管理又负责任务调度,当集群繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题

可用性差:采用了单节点的Master,没有备用Master及选举操作,这导致一旦Master出现故障,整个集群将不可用。

资源利用率低:TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,Hadoop 调度器负责将各个TaskTracker 上的空闲slot 分配给Task 使用。一些Task并不能充分利用slot,而其他Task也无法使用这些空闲的资源。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用。有时会因为作业刚刚启动等原因导致MapTask很多,而Reduce Task任务还没有调度的情况,这时Reduce slot也会被闲置。

不能支持多种MapReduce框架:无法通过可插拔方式将自身的MapReduce框架替换为其他实现,如Spark、Storm等。

Hadoop为了解决这些问题MapRev2版本,将JobTrack拆分成资源管理(Resource Manage, RM)和任务调度(ApplicationMaster ,AM),这样可扩展性更强,RM既可以使用自身的资源管理。MapRev2版本map/reduce模型也可以替换成Spark、storm等。Hadoop MapRev2版本中间结果保存需要频繁读取hdfs,io读取影响处理速度。Spark直接基于内存模式,中间数据放内存,读取效率高,Spark支持DAG图的分布式并行计算的编程框架,减少了迭代过程中数据的落地,提高了处理效率。Spark更加通用。不像Hadoop只提供了Map和Reduce两种操作,Spark提供的数据集操作类型有很多种,大致分为:Transformations和Actions两大类。Transformations包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort和PartionBy等多种操作类型,同时还提供Count, Actions包括Collect、Reduce、Lookup和Save等操作。另外各个处理节点之间的通信模型不再像Hadoop只有Shuffle一种模式,用户可以命名、物化,控制中间结果的存储、分区等。

Spark优势体现在:

易于使用。Spark现在支持Java、Scala、Python和R等语言编写应用程序,大大降低了使用者的门槛。自带了80多个高等级操作符,允许在Scala,Python,R的shell中进行交互式查询。

支持查询。Spark支持SQL及Hive SQL对数据查询。

支持流式计算。与MapReduce只能处理离线数据相比,Spark还支持实时的流计算。Spark依赖Spark Streaming对数据进行实时的处理,其流式处理能力还要强于Storm。

可用性高。Spark自身实现了Standalone部署模式,此模式下的Master可以有多个,解决了单点故障问题。此模式完全可以使用其他集群管理器替换,比如YARN、Mesos、EC2等。

丰富的数据源支持。Spark除了可以访问操作系统自身的文件系统和HDFS,还可以访问Cassandra, HBase, Hive, Tachyon以及任何Hadoop的数据源。这极大地方便了已经使用HDFS、Hbase的用户顺利迁移到Spark。

Spark适用场景:

目前大数据处理场景有以下几个类型:

复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;

基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间

基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

目前对以上三种场景需求都有比较成熟的处理框架,第一种情况可以用Hadoop的MapReduce来进行批量海量数据处理,第二种情况可以Impala进行交互式查询,对于第三中情况可以用Storm分布式处理框架处理实时流式数据。以上三者都是比较独立,各自一套维护成本比较高,而Spark的出现能够一站式平台满意以上需求。 Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。

需要反复操作的次数越- 多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合

数据量不是特别大,但是要求实时统计分析需求

2、Spark生态圈



3、Spark基本概念

术语描述
ApplicationSpark的应用程序,包含一个Driver program和若干Executor
SparkContextSpark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor
Driver Program运行Application的main()函数并且创建SparkContext
Executor是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请各自的Executor来处理任务
Partition数据分区。即一个RDD的数据可以划分为多少个分区
Cluster Manager在集群上获取资源的外部服务(例如:Standalone、Mesos、Yarn)
Worker Node集群中任何可以运行Application代码的节点,运行一个或多个Executor进程
Task具体执行任务。Task分为ShuffleMapTask和ResultTask两种。ShuffleMapTask和ResultTask分别类似于Hadoop中的Map,Reduce
JobSparkContext提交的具体Action操作,常和Action对应。用户提交的作业,一个Job可能由一到多个Task组成
Stage每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet。Job分成的阶段,一个Job可能被划分为一到多个Stage
RDD是Resilient distributed datasets的简称,中文为弹性分布式数据集;是Spark最核心的模块和类
DAGScheduler根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler
TaskScheduler将Taskset提交给Worker node集群运行并返回结果
Transformations是Spark API的一种类型,Transformation返回值还是一个RDD,所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的
Action是Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合;计算只有在Action被提交的时候计算才被触发。
NarrowDependency窄依赖。即子RDD依赖于父RDD中固定的Partition。NarrowDependency分为OneToOneDependency和RangeDependency两种。
ShuffleDependencyshuffle依赖,也称为宽依赖。即子RDD对父RDD中的所有Partition都有依赖。

Spark模型:



用户使用SparkContext提供的API(常用的有textFile、sequenceFile、runJob、stop等)编写Driver application程序。此外SQLContext、HiveContext及StreamingContext对SparkContext进行封装,并提供了SQL、Hive及流式计算相关的API。

使用SparkContext提交的用户应用程序,首先会使用BlockManager和BroadcastManager将任务的Hadoop配置进行广播。然后由DAGScheduler将任务转换为RDD并组织成DAG,DAG还将被划分为不同的Stage。最后由TaskScheduler借助ActorSystem将任务提交给集群管理器(Cluster Manager)。

集群管理器(ClusterManager)给任务分配资源,即将具体任务分配到Worker上,Worker创建Executor来处理任务的运行。Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。

Spark on yarn



RDD

RDD概念

RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join。

RDD目标

多个并行操作重用中间结果

机器学习和图应用中常用的迭代算法(每一步对数据执行相似的函数)

交互式数据挖掘工具(用户反复查询一个数据子集)

容错性

分布式数据集的容错性有两种方式:即数据检查点和记录数据的更新。我们面向的是大规模数据分析,数据检查点操作成本很高:需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源(在内存中复制数据可以减少需要缓存的数据量,而存储到磁盘则会拖慢应用程序)。所以,我们选择记录更新的方式。但是,如果更新太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列转换记录下来(即Lineage),以便恢复丢失的分区。

RDD描述

每个RDD都包含:(1)一组RDD分区(partition,即数据集的原子组成部分);(2)对父RDD的一组依赖,这些依赖描述了RDD的Lineage;(3)一个函数,即在父RDD上执行何种计算;(4)元数据,描述分区模式和数据存放的位置。例如,一个表示HDFS文件的RDD包含:各个数据块的一个分区,并知道各个数据块放在哪些节点上。而且这个RDD上的map操作结果也具有同样的分区,map函数是在父数据上执行的

RDD依赖以及容错

设计接口的一个关键问题就是,如何表示RDD之间的依赖。我们发现RDD之间的依赖关系可以分为两类

窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区(即与数据规模无关);

宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)



区分这两种依赖很有用。首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

调度器和分区

调度器根据目标RDD的Lineage图创建一个由stage构成的无回路有向图(DAG)。每个stage内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化(pipeline)。stage的边界有两种情况:一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短父RDD的计算过程。例如图6。父RDD完成计算后,可以在stage内启动一组任务计算丢失的分区。



Spark怎样划分任务阶段(stage)的例子。实线方框表示RDD,实心矩形表示分区(黑色表示该分区被缓存)。要在RDD G上执行一个动作,调度器根据宽依赖创建一组stage,并在每个stage内部将具有窄依赖的转换流水线化(pipeline)。 本例不用再执行stage 1,因为B已经存在于缓存中了,所以只需要运行2和3

调度器根据数据存放的位置分配任务,以最小化通信开销。如果某个任务需要处理一个已缓存分区,则直接将任务分配给拥有这个分区的节点。否则,如果需要处理的分区位于多个可能的位置(例如,由HDFS的数据存放位置决定),则将任务分配给这一组节点。对于宽依赖(例如需要Shuffle的依赖),目前的实现方式是,在拥有父分区的节点上将中间结果物化,简化容错处理,这跟MapReduce中物化map输出很像。如果某个任务失效,只要stage中的父RDD分区可用,则只需在另一个节点上重新运行这个任务即可。如果某些stage不可用(例如,Shuffle时某个map输出丢失),则需要重新提交这个stage中的所有任务来计算丢失的分区

梳理一下Spark中关于并发度涉及的几个概念File,Block,Split,Task,Partition,RDD以及节点数、Executor数、core数目的关系



输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block。当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

每个节点可以起一个或多个Executor。

每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。

每个Task执行的结果就是生成了目标RDD的一个partiton。

注意: 这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。

而 Task被执行的并发度 = Executor数目 * 每个Executor核数。

至于partition的数目:对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。在Map阶段partition数目保持不变。在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。

参考博客:

http://blog.csdn.net/u012050154/article/details/50503129

http://blog.csdn.net/u010429765/article/details/58035645

http://blog.csdn.net/lovehuangjiaju/article/details/48580863
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark