您的位置:首页 > 其它

《Data-intensive Text Processing with MapReduce》读书笔记第3章:MapReduce算法设计(1)

2011-07-12 09:57 549 查看
爽约了。本来应该昨天更新的,结果昨天收到刚买的Focus手机过于兴奋把正事忘记了,十分抱歉!

本读书笔记的目录地址:/article/6988681.html

引言

MapReduce的强大很大程度上来自于它的简单,程序员只需要准备以下几个要素:

输入数据

mapper和reducer

划分器与合并器(这两个是可选的)

但另一方面,这意味着所有希望使用MapReduce实现的算法都得以MapReduce模型特有的方式表达(map-reduce)

本章通过一些简单的示例介绍如何在MapReduce中处理一些常见的情况(即MapReduce中的“设计模式”)。本章的一些概念和方法将在后续章节发挥作用。

同步可能是MapReduce中最难以理解和掌握的一方面。事实上,在MapReduce中只有一个阶段的处理存在同步问题:mapper处理完毕后将输出的中间结果key-value对进行分组/排序后分别送入对应reducer的阶段(shuffle and sort)。在这个阶段以外的其他时候,各个mappper/reducer都是独立运行,互不相干的,因而在这些时候,有很多因素程序员无法控制:

一个特定的mapper/reducer在哪个节点上运行

每个mapper/reducer何时开始工作,何时工作完毕

每个输入key-value对被哪个特定的mapper处理

每个中间结果key-value对被哪个特定的reducer处理

但即使如此,程序员还是有一些其他手段可以控制MapReduce的执行方式和数据流向的:

使用自定义的数据结构来达到存储额外信息/通讯的目的

执行自定义的预处理/后处理操作

在一个mapper/reducer对象内部维护一些与当前上下文相关的信息

控制中间结果的分组/排序方式,从而控制中间结果进入reducer的顺序

控制划分器的划分行为,从而控制什么样的reducer接受什么样的中间结果

事实上,很多算法无法简单地表示为一步map-reduce操作。在这种情况下,需要仔细设计,将算法划分为多步map-reduce,且上一步reduce阶段输出的结果,刚好可以作为下一步map的输入

很多算法是迭代的:算法在终止条件满足前不断迭代执行某一操作。迭代条件在MapReduce中不易表示,这时需要引入外部程序作为“驱动器”协调基于MapReduce的迭代操作。

本章主要关注两点:

可扩展性:确保算法在更大规模数据集上运行不会出现瓶颈

算法效率:确保算法不会在并行性本身这点上消耗过多不必要的资源(例如网络传输)。最理想的效率是没有任何并行性开销,算法具有线性可扩展性,即集群规模不变,数据规模加倍时,处理时间也加倍;数据规模不变,集群规模加倍时,计算时间减半。

本章安排如下:

3.1节讲解局部合并(local aggregation)的重要性,将对合并器(combiner)进行详细讲解。局部合并可以合并mapper输出结果,从而减少需要进行网络传输的数据量。介绍“mapper内合并(in-mapper combining)”设计模式。

3.2节通过单词共现矩阵(word co-orrcurance matrices)构造的示例,讲解了两个常见的设计模式:对(pairs)与带(stripe)。这是组织key-value中数据结构的常见模式。这两种模式对于解决从大量样本观察数据中追踪协同事件这样的问题具有意义。

3.3节将3.2节的问题再进一步,改为计算相对频度(relative frequency)矩阵,从而引入反序(order inversion)模式。反序模式通过将算法中的操作序问题转化为MapReduce中对中间结果的排序问题提高算法效率。通常包含统计功能的算法需要反复遍历数据集(例如归一化,需要扫描一遍计算加和,然后二次扫描做除法),而通过巧妙地定义排序规则(使用反序模式),可以避免反复遍历的时间开销与维护统计结果的空间开销。

3.4节介绍了一种通用的二次排序法,可以用来对reducer接受的具有同一key值的key-value对序列进行二次排序。我们把这种模式称为“值键转换(value-to-key conversion)”。

3.5节介绍在关系数据(relational data)上进行连接(join)操作的方法,分为reduce端(reduce-side)、map端(map-side)和基于内存(memory-backed)三种。

3.1 局部合并

大规模数据处理中最重要的一方面问题就是数据的传输(在MapReduce中就是从数据的生产者mapper到传输者reducer)。传输通常包括磁盘I/O与网络传输。在Hadoop中,中间结果在通过网络传输至reducer之前通常先写入磁盘。无论是磁盘延迟还是网络延迟,相对于CPU的计算效率都是比较大的开销。而局部合并可以合并中间结果,降低需要写入/传输的数据规模,从而提高算法效率。好的局部合并操作对于高效的MapReduce算法至关重要。

局部合并主要通过两个手段达成:

设计合适的合并器(combiner)对mapper产生的中间结果进行合并

利用单个mapper内的上下文信息

通过局部合并可以有效地减少中间结果key-value对的数量(number)和规模(size),从而提高算法效率。

3.1.1 合并器(combiner)与mapper内合并(in-mapper combining)

仍然以2.2节中的单词计数统计程序为例说明局部合并(local aggregation)的原理。为了方便起见,我们将当时的伪代码再次列出,见图3.1. mapper输出(term,1)对 ,reducer接受这些对并将具有相同term值的1加和,最后输出每个term的计数。



图3.1 基本单词统计算法的伪代码

一种进行局部合并的方法是使用合并器(combiner,2.4节提到过)。combiner是一种优化机制,用来减少mapper产生的中间结果key-value对的数量。combiner可以理解为一种“迷你reducer”,每个combiner对应处理一个mapper输出的所有key-value对。在本例中,经过combiner的合并,每个mapper输出的key-value对数量从所有单词的数量下降到所有不同单词的数量。

图3.2给出了一个基本单词统计算法的改进算法。算法仅改进了mapper,因此仅将改进后的mapper伪代码列出(reducer略):



3.2 改进后的单词统计算法,使用一个关联

数组为单词计数,计数范围为一个文档内

算法在mapper中使用一个关联数组(Java中即为Map)为单词计数,计数范围为一个文档内。改进后,mapper不再为文档中的每个单词输出一个key-value,而只为每个不同单词输出一个key-value. 由于每个文档中几乎都有大量重复出现的单词(例如the),因此这个方法能够大量减少中间结果,尤其是大文档。

针对Hadoop编程特性,我们可以更进一步进行设计。根据2.6节,每个map任务(task)对应生成一个mapper对象(Java对象),在mapper对象开始处理操作前,可以触发自定义的初始化操作(Initialize)。在本例中,这个自定义初始化操作即为“生成一个空关联数组”。更进一步,可以扩大这个关联数组的计数范围,使其不仅仅局限于一个文档内,而将其扩展至一个mapper对象处理的所有文档。mapper处理完所有文档后,可触发用户自定义的后处理操作(Close),在这里,Close操作可以定义为“遍历这个关联数组,输出中间结果”。进一步改进后的算法如图3.3所示。



3.3 进一步改进后的单词计数算法

在这个算法中,我们并没有实现独立的combiner,仅通过一个包含于mapper中的合并功能,就打成了这个算法中可以达成的局部合并目标。

mapper内合并的优点

我们把这种模式称为“mapper内合并(in-mapper combining,以后均以这个名字称呼)”。这种模式有两个优点

对于when(何时进行合并)和how(如何合并)的精确控制。相比之下,独立combiner的执行语义在MapReduce中并没有严格定义:执行框架仅仅将combiner作为一个可选执行步骤,至于是否执行,执行几次,是不可控的。因此很多程序员都选择实现自己的mapper内合并方法,而不使用独立combiner。

相比独立combiner,mapper内合并更有效率。因为独立combiner虽然能够减少通过网络传输的中间结果key-value对,但却不能减少mapper生成的key-value对(因为独立combiner运行于map步骤完成以后)。这使得独立combiner不能减少对象的生成和销毁数(对象生成和垃圾回收gc也是有开销的,而且当对象数量很多的时候,这种开销必须正视)。不仅如此,由于mapper生成的中间结果首先写入磁盘,过多的对象还会导致对象序列化(serialization)/反序列化(deserialization)以及磁盘I/O的增加。而mapper内合并对解决这些问题均很有效。

mapper内合并的缺点

mapper内合并也有缺点

mapper内合并的本质上是通过在mapper对象内维护状态记录实现的,这破坏了函数式编程的特性(译者注:可见wiki里pure function的定义 http://en.wikipedia.org/wiki/Pure_function)。在实际编程中为了效率起见牺牲一部分理论纯粹性是很常见的,对于mapper内合并,这么做是完全值得的。

在多个输入样例(input instance,在MapReduce中即为key-value对)间维护状态记录意味着算法的行为有可能与输入序列的顺序相关。这有可能造成程序bug,而且这种bug在数据集很大的情况下将难以调试(当然,在单词计数算法中,输入序对于算法正确性显然没有影响)。

扩展性不足:mapper内合并受限于内存大小。对于单词统计算法来说,如果一个mapper接受的输入集拥有很大的单词量(这里的单词量是指不同单词的数量,即number of unique words),那么维护mapper对象内各单词计数的关联数组将会很大。因此对于图3.3的算法来说,其规模是有上限的(上限是单mapper节点能够使用的最大内存)。

对于这个问题也有折中的办法解决,比如周期性地将内存中的数据写入磁盘。对于单词统计来说,如果没有足够的内存使得mapper可以将全部输入统计完毕后再输入,则可以采取每n个key-value对作为一批处理的方法:每处理完n个key-value对,将当前计数关联数组写入磁盘并清空。关联数组占据的这块内存实际上起到一个处理缓冲区(buffer)的作用,缓冲区的大小对算法性能有很大影响,而由于内存有限,缓冲区也不可过大。所以缓冲区尺寸的选择需要根据计算资源和算法仔细选取。

关于局部合并的总结

局部合并能带来多大的性能提升?这取决于很多因素:

中间结果的key空间大小

key在空间内的分布

每个mapper产生的key-value对数量

……

局部合并性能提升的核心在于合并那些具有相同key值的key-value对

对于MapReduce中的reduce掉队现象,某些情况下局部合并也是一种解决之道。例如对于单词计数算法,如果没有局部合并,那么不同的reducer分配到的key-value对数量可能会有明显差别(试想reducer1分配到所有("the",1),而reducer2分配到所有("dog",1),则reducer1接受的key-value对一定远远多于reducer2),从而可能造成reduce掉队。

3.1.2 使用局部合并时的算法正确性

对combiner的两个约束

对于combiner来说:

由于其执行由MapReduce执行框架负责,因此算法如要保证正确性,必须保证算法结果与combiner无关(即combiner是否执行,执行后产生什么中间结果对算法的最终结果无影响)

MapReduce中,mapper的输入数据就是reducer的输入数据,因此这两种数据的格式是一致的。conbiner必须服从于这个前提(即combiner的输入数据、输出数据必须为mapper的输出格式,即reducer的输入格式)。

一个求平均数算法

首先考虑一个简单的例子:现在要处理一堆类型为<字符串, 整数>的key-value对,目的是输出每个不同key值对应的平均value值。为了更浅显易懂,我们用一个现实中的应用解释一下:现在要对一个大型web站点的用户访问日志(log)信息进行分析,日志为一(用户id, 时长)对的序列,每一个这样的key-value对记录了一次某用户对该网站的访问,其中用户id为该访问用户的id,时长为该次访问会话(session)持续的时间长度。现在需要根据这个日志信息进算每个用户每次访问该网站的平均会话时长。

图3.4给出了解决这一问题的基本算法。



图3.4 一个简单的求各key平均value的MapReduce算法

这个算法不包括合并器。mapper是一个简单的等价函数,直接将传入的(t,r)对输出。reducer根据输入的(t, [r1,r2...])计算各个t值对应的平均r值。

算法显然正确可用,但有着与3.1中基本单词计数算法同样的缺点:需要将所有mapper产生的key-value对通过网络传输给reducer,这是效率很低的做法。但与单词计数不同,不能简单地将reduce操作作为combiner使用。如果你不信,我们可以这么做一下试试:combiner计算局部数据集的平均值,并将其作为中间结果输入给reducer,reducer根据这个结果计算最终的平均值。根据这个规则构造的反例足以证明这个做法是错误的:

Mean(1,2,3,4,5)≠Mean(Mean(1,2),Mean(3,4,5))

求平均数算法:错误的combiner应用

那应该如何在这个算法中应用combiner呢?我们进行了尝试,图3.5给出了一种计算结果正确,但违背MapReduce程序设计原则的解决方法。



图3.5 一个计算结果正确,但违背

MapReduce程序设计原则的解决方法

通过将和与计数分开存放,图3.5所示算法能够正确计算出每个t值对应的平均r值。这是我们第一次使用自定义的数据类型。在前面的计算中,我们的key和value都是基本类型的数据。通过自定义数据类型,我们可以构造更为复杂的数据,将计算所需的数据组织到一起。

但这个算法是不正确的,因为它违背了combiner的设计原则(mapper输出格式与reducer输入格式一致。combiner的输入数据、输出数据必须为mapper的输出格式,即reducer的输入格式)

再次强调一点,combiner的执行是由MapReduce执行框架控制的。而在图3.5所示的例子中,combiner不执行和执行时送入reducer的是两种完全不同的数据类型(不执行时为(t,[r1,r2...]),执行时为(t,[(s1,c1),(s2,c2)...])))。而如果combiner被超过1次,第2次送入combiner的将不是(t,[r1,r2...])类型的数据,而是(t,[(s1,c1),(s2,c2)...])). 总之,combiner输入和输出数据类型的不一致,是这个算法的错误所在

求平均数算法:针对combiner的修正

图3.6给出了一个针对图3.5所示算法中存在问题加以改进的平均数算法。



图3.6 针对图3.5所示算法中存在的问题修正后的求平均数算法

通过改变mapper的输出类型,mapper输出类型、combiner输入/输出类型、reducer输入类型三者达到了一致,因此图3.6所示算法是一个正确、可用的MapReduce算法。

使用mapper内合并

图3.7展示了另一种局部合并优化后的求平均数算法,使用mapper内合并。



图3.7 使用mapper内合并优化后的求平均数算法

具体的做法是在mapper对象中维护两个关联数组,关联数组S记录和,T记录计数。mapper处理完所有数据后,对于所有处理过的t值,输出(t, (S{t}, C{t}))即可。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐