您的位置:首页 > 其它

[Berkeley]弹性分布式数据集RDD的介绍(RDD: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 论文翻译)

2015-08-21 09:36 411 查看
摘要:
    本文提出了分布式内存抽象的概念——弹性分布式数据集(RDD,Resilient Distributed Datasets),它允许开发人员在大型集群上执行基于内存的计算。RDD适用于两种应用,而现有的数据流系统对这两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD在共享状态的时候是基于粗粒度的转换而不是细粒度的更新(换句话说就是,RDD只能通过其他RDD上的批量操作来创建)。尽管如此,RDD仍然足以用于很多类型的计算,包括专用的迭代编程模型(如Pregel)等。我们已经将RDD应用到Spark系统之中,并对改性进行了相关的评估。

介绍:
    像MapReduce和Dryad这样的集群计算框架已经被广泛地应用到了大规模的数据分析中。这些系统提供了高级的运算符(算子),使得用户能够轻松地写出并行计算的程序而不用考虑任务分配问题以及错误容忍的细节。
    尽管目前的框架都提供使用集群计算资源的高级抽象,但是使用分布式内存却没有对应的类似的高级抽象。如果不能很好地使用内存,那么,当一些应用的数据结果需要多次复用的情况出现的时候,执行效率就会受到影响,这些应用一般包括:
(1)迭代式机器学习和图应用中常用的迭代算法(每一步对数据执行相似的函数),比如pagerank、K-means聚合、逻辑回归等算法;
(2)交互式数据挖掘工具(用户反复查询一个数据子集)。

   当前的框架采取的方法是将需要复用的数据存储到存储系统中(HDFS等),然后在每次查询时重新加载,那么,在数据复制、磁盘I/O、序列化的过程中会产生较大的开销,而这些开销会支配整个任务执行的时间。
    针对这个问题,我们提出了RDD,它可以为大部分的应用提供高效的数据复用。RDD是一个具有容错机制,并行数据架构,可以让用户将中间结果保存在内存中,并通过控制RDD的数据分区进而优化数据部署,还提供丰富的算子来操纵RDDs。

    就错误容忍而言,现在的集群系统采取的方式一般是提供细粒度的更新操作:数据备份或者是通过log处理进行数据恢复。对于数据密集型的任务来说,因为要进行大量的数据拷贝,network的带宽压力很大(network的带宽小于RAM(内存cache)的带宽),且文件系统负载也会加大。

    相比而言,RDD提供了一种粗粒度的转换方式来解决容物容忍的问题。RDD通过Lineage(包含了如何从其他RDD衍生所必需的相关信息),从而不需要检查点操作就可以重构丢失的数据分区。
    我们通过微基准和用户应用程序来评估RDD。实验表明,在处理迭代式应用上Spark比Hadoop快高达20多倍,计算数据分析类报表的性能提高了40多倍,同时能够在5-7秒的延时内交互式扫描1TB数据集。此外,我们还在Spark之上实现了Pregel和HaLoop编程模型(包括其位置优化策略),以库的形式实现(分别使用了100和200行Scala代码)。

RDDs

RDD Abstraction抽像
    RDD是只读的、分区记录的集合。RDD只能通过稳定物理存储中的数据集和其他已有的RDD上的确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join等。    RDD始终不需要物化。RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage信息),据此可以从物理存储的数据计算出相应的RDD分区。

    最后要说明的是,用户可以控制的RDDs的其他两个方面,持久化和划分:

    (1)用户可以指定需要复用的RDDs并选择一个基于内存的存储策略。用户可以请求将RDD缓存,这样运行时将已经计算好的RDD分区存储起来,以加速后期的重用。相关策略:缓存的RDD默认存储在内存中,但如果内存不够,可以写到磁盘上;用户也可以使用其他的存储策略,比如仅仅将RDD存储在磁盘等等;用户还可以在每个RDD上面指定优先级来确定最先spill到磁盘的RDD。
    (2)用户可以就RDD每条记录里面的key进行划分。这有助于部署的优化,例如,我们可以将两个数据集用同样的方式进行 hash-partitioned以便于之后的join操作(事先经过key进行了划分)。

Spark Programming Interface

    在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)的调用在其上进行转换操作。

    程序员一开始通过在存储层上的数据“转换”(map、filter等)定义一个或者多个RDDs,即transformations算子操作过程。之后,程序员就可以在“动作”(actions)中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在RDD上第一次执行actions 操作时,才会真正开始计算RDD(即延迟计算)。

    【补充】每个运算(如flatMap、map)其实返回的都是一个RDD对象,每个RDD对象都有一个Parent,通过这个Parent,实际上我们把一个个RDD对象串联了起来!可以认为最后形成了一个RDD对象的队列;直到最后需要计算时(调用了action算子,后调用runjob函数)才开始逐一调用各个RDD对象的compute方法,完成实际的运算。
    

    Example: Console Log Mining(控制台的日志挖掘)

    本部分我们通过一个具体示例来阐述RDD。假定有一个大型网站出错,操作员想要检查Hadoop文件系统(HDFS)中的日志文件(TB级大小)来找出原因。通过使用Spark,操作员只需将日志中的错误信息装载到一组节点的内存中,然后执行交互式查询即可。
    首先,需要在Spark解释器中输入如下Scala命令:
<strong>lines </strong>= spark.textFile("hdfs://...")    errors = lines.filter(_.startsWith("ERROR"))
errors.cache()


     第1行从HDFS文件定义了一个RDD(即一个文本行集合),第2行获得一个过滤后的RDD,第3行请求将errors缓存起来。注意在Scala语法中filter的参数是一个闭包。

    errors.count()

    
    用户还可以在RDD上执行更多的转换操作,并使用转换结果,如
<span style="font-size:18px;">    // Count errors mentioning MySQL
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()</span>


    使用errors的第一个action运行以后,Spark会把errors的分区缓存在内存中(action之后才会将数据加载到内存),极大地加快了后续计算速度。注意,最初的RDD
lines因为错误信息可能只占原数据集的很小一部分(小到足以放入内存)。
    最后,为了说明模型的容错性,下图给出了上述查询过程的的Lineage图。在RDD lines上执行filter操作,得到errors,然后再filter、map后得到新的RDD,在这个RDD上执行collect操作。Spark调度器以流水线的方式执行后两个转换,向拥有errors分区缓存的节点发送一组任务。此外,如果某个errors分区丢失,Spark只在相应的lines分区上执行filter操作(“血统”信息记录操作过程)来重建该errors分区。
    


Advantages of the RDD Model(分布式共享内存)
    为了进一步理解RDD作为一种分布式的内存抽象的优势,第一个优势见下表。表1列出了RDD与分布式共享内存(DSM,Distributed Shared Memory)的对比。

    


    在DSM系统中,应用可以向全局地址空间的任意位置进行读写操作。(注意这里的DSM,不仅指传统的共享内存系统,还包括那些通过分布式哈希表或分布式文件系统进行细粒度的数据共享的系统,比如Piccolo)DSM是一种通用的抽象,但这种通用性同时也使得在集群上实现有效的容错性更加困难。

    RDD与DSM主要区别在于,RDD只能通过粗粒度的批量转换来创建(即“写”)。然而,DSM可以对任意内存位置读写。也就是说,RDD限制应用只能执行批量写操作,这样有利于实现有效的容错。特别地,RDD没有设置检查点的开销,因为可以使用Lineage来恢复RDD。而且,失效时只需要重新计算丢失的那些RDD分区,可以在不同节点上并行执行,而不需要回滚整个程序。
    【注意】通过内存中的任务备份,RDD还可以处理落后任务(即运行很慢的节点),这点与MapReduce类似。而DSM则难以实现备份任务,因为任务及其副本都需要读写同一个内存位置。
    与DSM相比,RDD模型有两个好处。第一,对于RDD中的批量操作,运行时将根据数据存放的位置来调度任务,从而提高性能。第二,对于基于扫描的操作,如果内存不足以缓存整个RDD,就进行部分缓存。把内存放不下的分区存储到磁盘上,此时性能与现有的数据并行系统差不多。

    最后看一下读操作的粒度。RDD上的很多动作(如count和collect)都是批量读操作,即扫描整个数据集,可以将任务分配到距离数据最近的节点上。同时,RDD也支持细粒度操作,即在哈希或范围分区的RDD上执行关键字查找。

Applications Not Suitable for RDDs

    RDD可以很好的支持数据并行的批量迭代分析应用,且拥有很好的容错机制。RDD不太适合那些异步更新共享状态的应用,例如并行web爬行器。因此,我们的目标是为大多数批量分析型应用提供有效的编程模型,此外类型的应用交给专门的系统来处理。

Spark Programming Interface

    Spark用Scala语言实现了RDD的API。Scala是一种基于JVM的静态类型、函数式、面向对象的语言。我们选择Scala是因为它简洁(特别适合交互式使用)、有效(因为是静态类型)。但是,RDD抽象并不局限于函数式语言
    为了使用Spark,开发者需要编写一个driver程序,连接到集群以运行Worker,如下图所示。Driver定义了一个或多个RDD,并调用RDD上的动作。Worker是长时间运行的进程,将RDD分区以Java对象的形式缓存在内存中。
    


    再看看上面对log文件信息进行操作的例子,用户执行RDD操作时会提供参数,比如map会传递一个闭包(closure,函数式编程中的概念)。Scala将闭包表示为Java对象,这些对象在传递的时候被序列化,通过网络传输到其他节点上进行装载。Scala将闭包内的变量保存为Java对象的字段。例如,var
x = 5; rdd.map(_ + x) 这段代码将RDD中的每个元素加5。

    RDD本身是静态类型对象,由参数指定其元素类型。例如,RDD[int]是一个整型RDD。不过,我们举的例子几乎都省略了这个类型参数,因为Scala支持类型推断。

    虽然在概念上使用Scala实现RDD很简单,但还是要处理一些Scala闭包对象的反射问题,我们也需要解决如何通过Scala解释器来使用Spark还需要更多工作。不管怎样,我们都不需要修改Scala编译器。

 RDD Operations in Spark

    下表列出了Spark中主要的RDD transformations和actions操作。每个操作都给出了标识,其中方括号表示类型参数。如下图:

    


    【注意】有些操作只对键值对可用,比如join。另外,函数名与Scala及其他函数式语言中的API匹配,例如map是一对一的映射,而flatMap是将每个输入映射为一个或多个输出(与MapReduce中的map类似)。
    除了上述这些操作以外,用户还可以请求将RDD缓存起来。而且,用户还可以通过Partitioner类获取RDD的分区顺序,然后将另一个RDD按照同样的方式分区。还有些操作会自动产生一个哈希或范围分区的RDD,像groupByKey,reduceByKey和sort等。

Example Applications

    现在我们讨论一些迭代应用: logistic regression(逻辑归约)和PageRank.。

    
    Logistic Regression(逻辑规约)
    很多机器学习算法都具有迭代特性,运行迭代优化方法来优化某个目标函数,例如梯度下降方法。如果这些算法的工作集能够放入内存,将极大地加速程序运行。
    例如下面的程序是逻辑回归(一种常见的分类算法)的实现,该程序寻找一个最佳分割两组点(即垃圾邮件和非垃圾邮件)的超平面w。算法采用梯度下降的方法:开始时w为随机值,在每一次迭代的过程中,对w的函数求和,然后朝着优化的方向移动w。

<span style="font-size:18px;">val points = spark.textFile(...)
.map(parsePoint).persist()
var w =   // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}</span>


    上述程序首先定义一个名为points的缓存RDD,这是在文本文件上执行map转换之后得到的,即将每个文本行解析为一个Point对象。然后在points上反复执行map和reduce操作,每次迭代时通过对当前w的函数进行求和来计算梯度。之后的小节我们将看到这种在内存中缓存points的方式,比每次迭代都从磁盘文件装载数据并进行解析要快20X。

    已经在Spark中实现的迭代式机器学习算法还有:kmeans(像逻辑回归一样每次迭代时执行一对map和reduce操作),期望最大化算法(EM,两个不同的map/reduce步骤交替执行),交替最小二乘矩阵分解和协同过滤算法。

PageRank

    该算法通过合计链接到自身页面的page对其的贡献,进而迭代式地更新自身的排名。每一次的迭代中,每一个文件(page)都将r/m的贡献发送给他的邻居,其中r是它的rank,n是它邻居页面的的数目,之后,它会将自身的排名更新为α/N + (1 − α)∑ci,其中∑ci是是当前页面获取到的贡献的总和,N是文件(page)的总数。在Spark中,我们可以这样实现PageRank:

<span style="font-size:18px;">// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks =    // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}</span>


    该程序对应的“血统”图如下:

    


    在上图中,每一次的迭代,我们都会在之前的迭代过程以及静态的links数据集的基础上创建新的ranks数据集。这个图的一个有趣的功能是它的规模会随着迭代的次数增长。因此,在一个有很多次迭代的job中,对ranks进行复制备份进而减少错误恢复的次数也是很必要的。然而,需要注意的是links数据集不需要进行复制备份,因为links的恢复可以通过“血统”来高效地执行(对文件进行map操作)。links的数据集会比ranks大很多,因为很多文件都有很多link但是,一个文件仅有一个rank。

    最后一点,我们可以通过控制RDD的划分来优化PageRank处理过程中的交互。如果我们指明了links的划分(hash-partition)。我们可以用相同的方法划分ranks的数据集并且可以确定links和ranks之间的join操作不需要交互(因为每一个URL的link和它的link列表都在同一个机器(因为hash-partition))。我们也可以写一个自定义的Partitioner类来聚集相互链接的pages(比如,通过域名划分)。上述的每种优化过程都可以在定义links的时候通过调用partitionBy来进行:

<span style="font-size:18px;">    links = spark.textFile(...).map(...)
.partitionBy(myPartFunc).persist()</span>


    通过上述的调用,links和ranks之间的join操作将会自动地获取到来自其他page对当前页面的贡献并重新计算当前页面的rank值。

Representing RDDs

    提供RDD作为抽象的一个挑战是针对通过大范围的转换进行跟踪的“血统”的表示方式的选择。理想的话,一个执行RDDs相关操作的系统应该尽可能多的提供转换算子,并且允许用户任意组合这些算子。关于上述的挑战,一个简单的基于图的RDDs表示法有助于目标的完成。在Spark中已经开始使用。
    概括来说,每个RDD需要提供包含四种类型信息的接口:

    (1)一组RDD分区(partition,即数据集的原子组成部分);
    (2)对父RDD的一组依赖,这些依赖描述了RDD的Lineage;
    (3)一个函数,即在父RDD上执行何种计算;
    (4)元数据,描述分区模式和数据存放的位置。例如,一个表示HDFS文件的RDD包含:各个数据块的一个分区,并知道各个数据块放在哪些节点上。而且,这个RDD上的map操作结果也具有同样的分区,map函数是在父数据上执行的。下表总结了RDD的内部接口。
     

    设计接口的一个关键问题就是,如何表示RDD之间的依赖。我们发现RDD之间的依赖关系可以分为两类,即:(1)窄依赖(narrow dependencies):子RDD的每个分区至多依赖一个父分区(即与数据规模无关);(2)宽依赖(wide
dependencies):子RDD的多个分区可以依赖一个父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。另一个例子见下图:

    


    区分这两种依赖很有用。首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;而宽依赖则需要获取所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的祖先产生冗余计算开销。
    【补充】
    第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成,并且父RDD的计算结果进行hash并传到对应节点上之后才能进行子RDD的计算。
    第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复在宽依赖情况下,丢失一个子RDD分区需要重新计算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的(groupbykey()),会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点(Tachyon中已经通过Edge算法实现,见Tachyon论文翻译)。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。

    通过RDD接口,Spark只需要不超过20行代码便可以实现大多数转换。下面给出一些RDD的实现实例:

    HDFS文件目前为止我们给的例子中输入RDD都是HDFS文件,对这些RDD可以执行:partitions操作返回各个数据块的一个分区(每个Partition对象中保存数据块的偏移),preferredLocations操作返回数据块所在的节点列表,iterator操作对数据块进行读取。

    map:任何RDD上都可以执行map操作,返回一个MappedRDD对象,该对象与父RDD具有相同的分区以及偏好位置。该操作传递一个函数参数给map,对其父RDD上的记录按照iterator的方式执行这个函数。
    union:在两个RDD上执行union操作,返回两个父RDD分区的并集。通过相应父RDD上的窄依赖关系计算每个子RDD分区(注意union操作不会过滤重复值,相当于SQL中的UNION ALL)。
    sample:抽样与映射类似,但是sample操作中,RDD需要存储一个随机数产生器的种子(随机数的起始值),这样每个分区能够确定哪些父RDD记录被抽样。
    join:对两个RDD执行join操作可能产生窄依赖(如果这两个RDD拥有相同的哈希分区或范围分区),可能是宽依赖,也可能两种依赖都有(比如一个父RDD有分区,而另一父RDD没有)。
    【补充】join过程见下图(基本过程是:先对需要连接的RDD进行cogroup函数操作。cogroup之后形成的新RDD,对每个key下的元素进行笛卡尔积操作)。

    


Implementation

    我们已经用scala代码实现了Spark系统。系统运行在Mesos集群管理之上,且可以与Hadoop、MPI以及其他应用进行资源共享(兼容)。每一个Spark程序作为一个独立的Mesos应用运行,且都有自己的driver(master)和workers。应用之间的资源共享由Mesos来处理。

    Spark可以通过Hadoop已经存在的插件接口读取来自任何Hadoop的输入资源。

Job Scheduling

    总的来说,我们的调度器跟Dryad类似,但我们还考虑了哪些RDD分区是缓存在内存中的。调度器根据目标RDD的Lineage图创建一个由stage构成的有向无环图(DAG)如下图所示。每个stage内部尽可能多地包含一组具有窄依赖关系的流水线转换(transformations)。stage边界的划分有两种情况:一是宽依赖上的Shuffle操作;二是已计算的分区,它可以缩短父RDD的计算过程。在stage内需要启动一组任务用于计算缺失的分区,直到目标RDD计算完成。
    


    图解:Spark怎样划分任务阶段(stage)的例子。实线方框表示RDD,实心矩形表示分区(黑色表示该分区被缓存)。要在RDD G上执行一个动作(action),调度器根据宽依赖创建一组stage,并在每个stage内部将具有窄依赖的转换流水线化(pipeline)。
本例不用再执行stage 1,因为B已经存在于缓存中了,所以只需要运行2和3。

    由于延迟调度的机制,调度器根据数据存放的位置将任务分配给机器(数据的本地性)。如果某个任务需要处理一个已经缓存好的分区,则直接将任务分配给拥有这个分区的节点。否则,如果需要处理的分区位于多个可能的位置(例如,由HDFS的数据存放位置决定),则将任务分配给这一组节点。
    对于宽依赖(例如需要Shuffle的依赖),目前的实现方式是,在拥有父分区的节点上将计算的中间结果物化(持久化),通过复制备份简化容错处理,这跟MapReduce中物化map输出很像。

    如果某个任务失效,只要stage中的父RDD分区可用,则只需在另一个节点上重新运行这个任务即可。如果某些stage不可用(例如,Shuffle时某个map输出丢失),则需要重新提交这个stage中的所有任务来并行计算丢失的分区。

    最后,虽然Spark中所有的计算都是因为action算子的触发才开始的。

Interpreter Integration(解释器的集成)

    像Ruby和Python一样,Scala也有一个交互式shell。基于内存的数据可以实现低延迟,我们希望允许用户从解释器交互式地运行Spark,从而在大数据集上实现大规模并行数据挖掘。

    Scala解释器通常根据用户输入的代码行,来对类进行编译,接着装载到JVM中,然后调用类中的一个函数进行处理。这个类是一个包含输入行上的变量或函数的单例对象,且会用一个初始化函数运行这行代码。例如,如果用户输入代码var x =
5,接着又输入println(x),则解释器会定义一个包含x的Line1类,并将第2行编译为println(Line1.getInstance().x)。

    在Spark中我们对解释器做了两点改动:
    (1)类传输:解释器能够支持基于HTTP传输类字节码,这样worker节点就能获取输入每行代码对应的类的字节码。
    (2)改进的代码生成逻辑:通常在每行代码行上创建的单态对象通过对应类上的静态方法被访问。也就是说,如果要序列化一个闭包,它会引用前面代码行中变量。比如上面的例子Line1.x,Java不会根据对象关系传输包含x的Line1实例。所以worker节点不会收到x。所以,我们将这种代码生成逻辑改为直接引用各个行对象的实例。下图说明了解释器如何将用户输入的一组代码行解释为Java对象。
     


    Spark解释器便于跟踪处理大量对象关系引用,并且便利了HDFS数据集的研究。我们计划以Spark解释器交互式地运行高级数据分析语言,比如类似SQL。

Memory Management

    Spark为持久化存储RDD提供三种选择:

    (1)内存存储反序列化的java对象;

    (2)内存存储序列化的数据;

    (3)磁盘存储方式。

    第一种选择可以提供最快的性能,因为java虚拟机可以本地化获取RDD。第二种选择可以让用户选择一个相对于空间受限的java对象图(UML图?)而言更加具有内存效率的方法(无需组织java对象,只是使用纯粹的数据对象),但是要以降低性能为代价。第三种方法对于过大而不能保存在RAM中的RDD是有用的,但是重计算会有消耗。

    为了管理有限的内存,我们在RDDs的级别使用LRU回收策略。当一个新的RDD分区被计算但是又没有足够的空间来存储。我们回收一个最近最少访问的RDD的分区,除非这个RDD与包含那个将要被计算的新分区的RDD是一个RDD,在这种情况下,我们将旧的分区保存在内存中进而阻止相同RDD中的partiton不停地换进换出。这是很重要的因为大多数的操作会在整个RDD上运行tasks,因此内存中的分区在将来被使用是很可能的。我们发现这个默认的策略至今为止在我们的应用中都工作的很好。我们也为用户提供了更深一步的控制策略:为每一个RDD提供持久化存储。

    最后要说的是,现在集群上的每一个Spark实例都有自己独立的内存空间。将来,我们计划探究通过标准的内存管理器管理Spark实例来共享RDDs。

Support for Checkpointing

    尽管RDD中的Lineage信息可以用来故障恢复,但对于那些Lineage链较长的RDD来说,这种恢复可能很耗时。例如上述的Pregel任务,每次迭代的顶点状态和消息都跟前一次迭代有关,所以Lineage链很长。所以,在RDD上执行检查点设置就很有效。

    一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。这种情况下,集群中一个节点故障(导致f父RDD所在的磁盘数据丢失)可能导致每个父RDD的数据块丢失,因此需要全部重新计算。相反,如果在窄依赖之上使用检查点的操作是没有价值的。如果一个节点失败了,可以通过“血统”进行重新计算,这样带来的开销也仅仅是复制RDD的几分之一而已。

    Spark当前已经提供了checkpoint的接口,但是那些数据需要进行checkpoint操作的选择权留给了用户。然而我们也研究了如何执行自动的检查点设置,因为我们的调度器知道每一个数据集的大小以及它第一次计算所占用的时间,所以选择一个更优的RDD数据集设置检查点是可以最小化系统数据恢复的计算时间。

    值得注意的是,因为RDD是只读的,所以不需要任何一致性维护(例如写复制策略,分布式快照或者程序暂停等)带来的开销,也不会影响到后台执行检查点设置操作。

Evaluation

    我们在Amazon EC2[1]上进行了一系列实验来评估Spark及RDD的性能,并与Hadoop及其他应用程序的基准进行了对比。总的说来,结果如下:

    (1)对于迭代式机器学习以及图相关的应用,Spark比Hadoop快20多倍。这种加速比是因为:Spark中数据以Java对象的形式存储在内存中,进而避免了过多的I/O操作以及反序列化操作。
    (2)用户编写的应用程序执行结果很好。特别地,Spark分析报表比Hadoop快40多倍。
    (3)如果节点发生失效,通过重建那些丢失的RDD分区,Spark能够实现快速恢复。
    (4)Spark能够在5-7s延时范围内,交互式地查询1TB大小的数据集。
    我们的基准测试首先从一个运行在Hadoop上的具有迭代特征的机器学习应用和PageRank开始,然后评估在Spark中当工作集不能适应缓存时系统容错恢复能力,最后讨论用户应用程序和交互式数据挖掘的结果。

    除非特殊说明,我们的实验使用m1.xlarge EC2 节点,4核15GB内存,使用HDFS作为持久存储,块大小为256M。在每个作业运行执行时,为了保证磁盘IO开销开销的测试更加精确,我们清理了集群中每个节点的操作系统缓存。

Iterative Machine Learning Applications

    我们实现了2个迭代式机器学习(ML)应用,Logistic回归和K-means算法,在如下系统之间进行性能对比:
    (1)Hadoop:Hadoop 0.20.0稳定版。
    (2)HadoopBinMem:在首轮迭代中通过将输入数据转换成为开销较低的二进制格式来减少后续迭代过程中文本解析的开销,并将其保存在基于内存的HDFS实例中。
    (3)Spark:基于RDD的系统(在首轮迭代中缓存Java对象以减少后续迭代过程中解析、反序列化的开销)
    我们使用同一数据集在相同条件下运行Logistic回归和K-means算法:在25-100台机器,执行10次迭代处理100G输入数据集。两个作业的关键区别在于每轮迭代的单个字节上的计算量不同。K-means的迭代时间取决于更新聚类坐标(计算过程)耗时,Logistic回归是非计算密集型的,但是在I/O操作和解析过程中非常耗时。
    由于典型的机器学习算法需要数10轮迭代才可以收敛,我们分别统计了首轮迭代和后续迭代计算的耗时,并从中发现,通过RDD进行数据共享极大地加快了后续迭代的速度。

    首轮迭代。在首轮迭代过程中,三个系统都是从HDFS中读取文本数据作为输入。如图7所示,实验中Spark略快于Hadoop,主要是因为Hadoop中的master节点和worker节点之间基于心跳协议来发送信号带来了开销。HadoopBinMem是最慢的,因为它通过一个额外的MapReduce作业将数据转换成二进制格式会带来开销,并且HadoopBinMem必须通过网络将二进制文件在内存中进行备份操作,这也会带来开销。
     后续迭代图7显示了后续迭代的平均耗时,图8对比了不同大小集群条件下耗时情况。从图8(a)我们发现,在100个节点上运行Logistic回归程序,Spark比Hadoop、HadoopBinMem分别快25.3、20.7倍。从图8(b)可以看到,尽管是更加典型的计算密集型的K-means应用,Spark仍旧比Hadoop、HadoopBinMem分别快1.9、3.2倍,这是因为K-means程序的计算开销是主要的(用更多的节点有助于提高计算速度的倍数)。
    【对于图7而言,需要注意】后续迭代中,Hadoop仍然需要从HDFS读取文本数据作为输入,所以从后续迭代中Hadoop的迭代时间并没有明显的改善。但是,使用预先转换的SequenceFile文件(Hadoop内建的二进制文件格式)的 HadoopBinMem,在后续迭代中节省了解析的代价,但是仍然会带来其他的开销,如从HDFS读SequenceFile文件并转换成Java对象等。因为Spark是直接读取缓存于RDD中的Java对象的,随着集群尺寸的线性增长,迭代时间大幅下降,性能得到提升。

    


    


    理解速度提升。我们非常惊奇地发现,Spark胜过基于内存存储二进制数据的Hadoop(HadoopBinMem)高达20倍,Hadoop运行慢是由于如下几个原因:

    (1)Hadoop软件栈限定的最小开销;
    (2)HDFS提供数据的开销;
    (3)将二进制记录转换成内存Java对象过程中的解序列化的开销。
    我们依次研究了上述的每一个因素:
     为了估测1,我们运行空的Hadoop作业,仅仅执行作业的初始化、启动任务、清理工作就至少耗时25秒。对于2,我们发现为了服务每一个HDFS数据块,HDFS进行了多次内存复制以及计算校验和操作,必然会带来相应的开销。

    为了估测3,我们在单个节点上运行了微基准程序,在输入的不同形式的256M数据上计算Logistic回归。特别地,对于来自基于内存的HDFS和本地内存两种情况下的文件,我们对比直接处理输入的文本和处理转换为二进制后的文件的性能。
    首先,基于内存的HDFS文件和基于内存的本地文件的读取过程,前者多耗时2秒。其次,直接的文本和二进制格式文件的解析过程,前者多耗时7秒(Spark RDD除外)。最后,即使是处理基于本地内存的文件,预解析的二进制文件转换为内存中的Java对象,也要耗时3秒。然而,通过直接将RDD缓存为内存中的Java对象,就会避免上述的开销,最终只需要耗时3秒。
    


PageRank

    通过使用存储在HDFS上的54G大小的Wikipedia导出的数据,我们比较了Spark与Hadoop在PageRank计算方面的性能。PageRank算法通过10轮迭代处理了大约400万文章的链接图数据,图10显示了在30个节点上,基础的Spark处理速度是Hadoop的2.4倍。改进(对RDD进行Hash分区,来让迭代过程变得连贯)后速度提升到7.4倍,这些结果数据是随着节点扩展到60个而同步放大的。
    


    我们也通过在Spark上实现Pregel来运行PageRank算法进行评估,在后面会提到。迭代次数和图10类似,但是耗时要多4秒,因为Pregel会在每一次的迭代过程中运行额外的操作来让节点“投票”决定是否结束当前job。

    【补充】Pregel:Google分布式图计算框架,主要用于图遍历、最短路径、PageRank算法等。
http://www.360doc.com/content/11/0609/19/6986090_122743127.shtml

Fault Recovery

    基于K-means算法应用程序,我们评估了在单点故障(SPOF)时使用Lneage信息创建RDD分区的开销。图11显示了,K-means应用程序运行在75个节点的集群中进行了10轮迭代,我们在正常操作和进行第6轮迭代开始时一个节点发生故障的情况下对耗时进行了对比。没有任何失败的情况下,每轮迭代启动了400个任务处理100G数据。

    


    第5轮迭代结束时,交互时间大约为58秒,第6轮迭代时,一个节点被Kill掉,该节点上的任务都被终止,存储在其上的RDD的分区也丢失了。Spark调度器调度这些任务在其他节点上重新并行运行:重新读取一直的输入数据并基于Lineage信息重建RDD,这使得迭代计算耗时增加到80秒。一旦丢失的RDD分区被重建,平均迭代时间又回落到58秒。

    需要注意的是,基于检查点的错误恢复机制(纯粹的检查点操作,并没有“血统”机制),恢复过程需要执行很多次的迭代操作。而且,系统需要通过网络将应用中多达100GB的数据进行复制,因为复制,Spark或许需要花费两倍的内存来完成备份工作。相反,我们例子中的RDD的“血统”图的存储仅仅需要10KB大小的空间(存储在磁盘)。

Behavior with Insufficient Memory(内存不足)

    到现在为止,我们能保证集群中的每个节点都有足够的内存去缓存迭代过程中使用的RDD。那么问题来了,如果没有足够的内存来缓存一个作业的工作集,Spark又是如何运行的呢?在实验中,我们通过在每个节点上限制缓存RDD所需要的内存资源来对Spark进行配置,在不同的缓存配置条件下执行Logistic回归,结果如图12。我们可以看出,随着内存的减小,性能平缓地下降。

    


User Applications Built with Spark

    In-Memory Analytics

    视频分发公司Conviva使用Spark极大地加快了一些数据分析报告的处理速度,之前是在Hadoop上运行的。例如,一个报告的处理相当于一系列的Hive[1]的查询处理过程,过程中对关于用户各种各样的统计数据进行了计算,这些查询作用在相同的数据子集上(数据满足用户提供的条件(过滤器)),但是在不同分组的字段上执行聚合操作(聚合函数)(SUM、AVG、COUNT
DISTINCT等)需要使用单独的MapReduce作业。
    通过执行Spark中的查询操作将共享的数据子集加载到一个RDD里面,该公司可以将报告的处理速度提升40倍。在Hadoop集群上处理200G压缩数据生成的报告耗时20小时,而使用Spark基于96G内存的2个节点只需耗时30分钟即可完成,主要是因为Spark仅仅将符合用户要求的信息的行列数据存储到了一个RDD里面,而不需要存储整个解压缩的文件。

    Traffic Modeling(城市交通建模
    在Berkeley的Mobile Millennium项目中,研究人员基于一系列分散的汽车GPS监测数据,使用并行化学习算法来推算公路交通拥堵状况。数据来自10000个互联的公路线路网,还有600000个由汽车GPS装置采集到的点到点的旅行时间的样本数据,这些数据记录了汽车在两个地点之间行驶的时间(每一条路线的行驶时间可能跨多个公路线路网)。使用一个交通模型,可以估计跨多个公路网行驶的耗时,由此,系统能够估算交通的拥堵状况。研究人员使用Spark实现了一个可迭代的EM(最大期望)算法,算法迭代地重复执行map、reduceByKey操作。应用从20个节点扩展到80个节点(每个节点4核),如图13(a)所示。
    Twitter Spam(垃圾邮件)分类。Berkeley的Monarch项目使用Spark识别Twitter消息上的Spam链接。他们在Spark上实现了一个Logistic回归分类器,并使用分布式的reduceByKey操作对梯度向量并行求和。图13(b)显示了基于50G数据子集的分类器的训练结果,整个数据集包括250000的URL、至少10^7个与网络相关的特征/维度以及每个URL对应页面的内容属性。随着节点的增加,这并不像交通应用程序那样近似线性,主要是因为每轮迭代的固定通信代价较高。
    


Interactive Data Mining(交互式数据挖掘)

    为了展示Spark交互式查询大数据集的能力,我们在100个m2.4xlarge EC2实例(8核68G内存)上使用Spark分析两年间的1TB的Wikipedia页面浏览日志数据。在整个输入数据集上简单地查询如下内容以获取页面浏览总数:
    (1)全部页面;
    (2)页面的标题能精确匹配给定的关键词;
    (3)页面的标题能部分匹配给定的关键词。

    图14 显示了分别在整个、1/2、1/10的数据集上查询的响应时间,甚至1TB数据在Spark上查询仅耗时5-7秒,这比直接操作磁盘数据快几个数量级,例如,从磁盘上查询1TB数据耗时170秒,这表明了RDD缓存使得Spark成为一个交互式数据挖掘的强大工具。
    


Discussion

Expressing Existing Programming Models

    RDD可以高效地实现目前已经被提议独立的集群编程模型。所谓高效,就是RDDs不仅仅可以实现跟这些模型一样的输出,还可以优化这些框架的性能,比如将特定的数据保存在内存;将RDD进行分区进而实现节点交互的优化;以及进行高效的数据恢复。这些可以使用RDDs进行实现的模型包括:

    MapReduce

    该模型可以使用Spark里面的flatMap和groupByKey算子来实现,或者是在有combiner的前提下用reduceByKey来实现。

    DryadLINQ

    该系统相对于MapReduce会提供更加广泛的算子,这些算子基本上都是Spark中的的整体计算转换算子,比如(map, groupByKey, join等
    SQL

    类似于DryadLINQ,SQL在记录集上执行数据并行查询的操作。
    Pregel

    Google的Pregel是专门用于图处理应用的模型,这个模型看起来跟其他面向数据集的程序模型很不一样。在Pregel(基于Bulk Synchronous
Parallel,整体同步并行计算模型)中,程序由一系列超步(Superstep)协调迭代运行在每个超步中,各个顶点执行用户函数,并更新相应的顶点状态,改变图的拓扑结构,然后向下一个超步的顶点集发送消息。这种模型能够描述很多图算法,包括最短路径,双边匹配和PageRank等。
    使用RDDs来完成上述模型的主要原因是Pregel对所有定点的每一次迭代中使用了相似的用户程序。因此,我们可以将一个RDD中每一次迭代的定点状态信息进行存储,然后用该用户函数执行整体转换(flatmap)并将相关信息存储在新生成的RDD里面。我们可以将这个新生成的RDD和定点的状态进行join操作来进行定点的信息交换更新操作。还有一点很重要,RDDs可以允许我们像Pregel一样将定点状态信息存储在内存中,这样就可以通过控制RDD的分区来优化集群节点之间的交互了,也可以支持部分的错误回复。

    Iterative MapReduce

    最近的好几个系统,包括Hadoop和Twister都提供了交互式的MapReduce模型,在该模型中,用户可以给系统一系列的job来迭代执行。此系统可以通过迭代连续地进行数据划分,且Twister可以将数据保存在内存当中。
    Batched Stream Processing

    研究人员最近提议了好几个增量的处理系统,这些系统都可以周期性地更新数据。例如,一个每15分钟对广告点击统计进行更新的应用,该应用可以将之前15分钟的状态和最新的log数据进行合并从而得到最新的状态。这些系统执行和Dryad类似的整体操作,但是会在分布式文件系统中存储状态。将中间状态放入RDDs中可以加速处理过程。
    Explaining the Expressivity of RDDs

    为什么RDDs可以实现各种程序模型?因为附加在RDD上面的限制几乎不会影响到大多数的并行应用。特别地,虽然RDDs仅仅只能通过整体的转换操作来创建,但是很多的并行程序自然地会对记录采用相同的操作(适合整体操作的模式)。相似地,RDDs的不可变性也不是什么障碍。因为我们可以创建多个RDD来来代表相同数据集的不同版本。事实上,目前使用文件系统的很多的MapReduce应用都不支持文件的更新操作。 
  

Leveraging RDDs for Debugging

    当我们最开始设计RDDs进行广义地(“血统”)针对错误容忍的重计算的时候,这个属性也有助于debug。特别地,通过记录一个job期间创建的“血统”信息,我们之后可以重新计算这些RDDs,也可以让用户交互式地查询这些信息,进而可以在一个单进程的debugger过程中通过重新计算对应的RDD分区来重新运行来自一个job的任何的task(有针对性地获取“血统”信息,并有针对性地对单个partition进行计算来针对性地重运行单个task)。不像传统分布式系统那样通过重新运行app来进行bug调试的调试者,必须在较多的nodes间搞清楚事件的执行顺序。相对而言,RDDs的debug过程没有增加任何负载,因为只有RDD的“血统”图需要被记录,我们目前也正基于上述的观点开发一个Spark调试器。

Related Work

Cluster Programming Models

    目前的集群模型分为好几类:
    (1)数据流模型。像MapReduce、Dryad、Ciel那样拥有一系列处理数据的算子,但是是使用稳定的存储系统来进行数据的共享的。
    相对于那些稳定的存储系统,RDDs代表一种更加高效的数据共享抽象,因为RDDs避免了数据复制、I/O以及序列化的开销。
    (2)拥有高级程序接口的数据流系统。比如DryadLINQ、FlumeJava等,这些系统会提供相关的API使得用户可以通过map和join算子进行并行的数据获取。然而,在这些系统中,并行的数据获取对象可以是磁盘中的文件或者是用于查询过程的暂时的数据集,且系统会通过相同的算子来进行流水线式的数据处理(比如一系列的map操作),但是相同的算子的处理过程中,数据共享的效率太低。
    为了更加便利,我们在并行获取数据的过程中以正常的Spark API为基础,但是在口后背后会提供RDDs作为存储抽象。

    (3)为特定的需要数据共享的应用提供高级接口的系统。例如,Pregel支持迭代的图应用,而Twister和HaLoop是处理迭代式的MapReduce的。然而,这些系统会为他们支持的计算模块提供数据共享。不会为用户提供普遍的数据共享的抽象。例如,用户无法使用Pregel或者Twister来将一个数据集加载到内存,并决定哪个数据查询的操作可以在上面执行。
    但是RDDs提供了一个分布式存储抽象并且可以支持哪些专门的系统无法支持的应用,比如交互式数据挖掘。

    (4)暴露了可分享的可变的状态的一些系统。例如,Piccolo让用户可以运行一些并行函数来读取或者更新一个分布式hash表的元素。DSM(Distributed
shared memory )以及key-value存储(比如RAMCloud提供了相似的模型)也是类似的。
    RDDs与上述系统有两个方面的不同。第一,RDD基于算子map、sort和join等提供了高级编程接口,然而,Piccolo和DSM仅仅只能读取以及更新hash表中的元素。第二,Piccolo和DSM系统通过检查点以及回滚操作来执行数据恢复,其开销要比RDDs的基于“血统”的错误容忍策略大很多。

    Caching Systems

    Nectar能够通过识别带有程序分析的子表达式,跨DryadLINQ作业重用中间结果,这种能力将会加入到基于RDD的系统。但是Nectar并没有提供In-Memory缓存(使用一般的分布式文件系统),也不能够让用户显式地控制应该缓存那个数据集,以及如何对其进行分区。Ciel和FlumeJava同样能够记住任务结果,但不能提供In-Memory缓存并显式控制它的缓存方式。
    Ananthanarayanan系统通过利用数据存取的临时性和空间本地性,进而增加了一个基于内存缓存的分布式文件系统。这种方法提供了针对文件系统更快的数据存取速度,但是和共享中间结果的RDDs比起来效率还是不行,因为该系统还是需要应用将stage操作之间的结果写入文件系统进而共享数据。

   Lineage
    在科学计算和数据库领域,表示数据的Lineage和原始信息一直以来都是一个研究课题。对于一些应用,如需要解释结果以及允许被重新生成、工作流中发现了bug或者数据集丢失需要重新计算数据。RDD提供了一个并行的编程模型,在这个模型中使用的“血统”机制来表示是非常容易的,因此它可以被用于容错。
    基于“血统”的恢复机制其实与MapReduce以及Dryad的恢复机制是类似的,也会追踪tasks构成的DAG图的依赖关系。然而在这些系统中,“血统”信息会在一个job结束之后丢失,最后还是需要采用在存储系统进行复制备份的方法来共享计算的数据。相反,RDDs用"血统"机制高效地将数据缓存至内存而不需要复制和I/O的开销。
    

   Relational Databases

    从概念上看,RDD类似于数据库中的视图,RDD的持久化过程类似于视图的物化。然而,像DSM系统一样,数据库允许典型地读写所有记录,通过操作日志和数据的日志来实现容错,还需要花费额外的开销来维护一致性。RDD编程模型粗粒度的转换模型可以避免这些开销。

Conclusion

    我们提出的RDD是一个高效的,通用的,具有错误容忍的应用于共享数据的集群应用的抽象。RDD广泛支持并行应用,还包括已经提出来的很多专门的程序模型处理的迭代计算以及那些专门的程序模型不支持的新应用。不像已经存在的通过数据复制进行容错处理的集群存储抽象,RDD基于粗粒度的转换操作提供了一些列的API,这些API可以基于“血统”机制进行高效率的数据恢复。使用RDD的Spark系统实现处理迭代式作业的速度超过Hadoop大约20倍,而且还能够交互式查询数百GB的数据。

    
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: