您的位置:首页 > 其它

Yahoo!教程:MapReduce

2016-07-29 00:00 239 查看
本文整理自:http://developer.yahoo.com/hadoop/tutorial/module4.html

1、MapReduce基础


(1)函数式编程概念
MapReduce 程序是设计用来并行计算大规模海量数据的,这需要把工作流分划到大量的机器上去,如果组件(component)之间可以任意的共享数据,那这个模型就没法扩展到大规模集群上去了(数百或数千个节点),用来保持节点间数据的同步而产生的通信开销会使得系统在大规模集群上变得不可靠和效率低下。
实际上,所有在MapReduce上的数据元素都是不可变的,这就意味着它们不能够被更新。如果在一个mapping任务中你改变了一个输入键值对,它并 不会反馈到输入文件;节点间的通信只在产生新的输出键值对((key,value)pairs)时发生,Hadoop系统会把这些输出传到下一个执行阶段。
(2)列表处理(List Processing)
从概念上讲,MapReduce程序转变输入数据元素列表成输出数据元素列表。一个MapReduce程序会重复这个步骤两次,并用两个不同的术语描述:map和reduce,这些术语来自于列表处理语言,如LISP,Scheme,或ML。
(3)Mapping数据列表(Lists)
MapReduce程序的第一步叫做mapping,在这一步会有一些数据元素作为Mapper函数的输入数据,每次一个,Mapper会把每次map得到的结果单独的传到一个输出数据元素里。



图4.1 Mapping通过对输入数据列表中的每一个元素应用一个函数创建了一个新的输出数据列表

这里举一个map功能的例子:假设你有一个函数toUpper(str),用来返回输入字符串的大写版本。你可以在map中使用这个函数把常规字符串列表转换成大写的字符串列表。注意,在 这里我们并没有改变输入字符串:我们返回了一个新的字符串,它是新的输出列表的组成部分之一。

(4)Reducing数据列表(Lists)
Reducing可以让你把数据聚集在一起。reducer函数接收来自输入列表的迭代器,它会把这些数据聚合在一起,然后返回一个输出值。



图4.2 通过列表迭代器对输入数据进行reducing操作来输出聚合结果

Reducing一般用来生成”总结“数据,把大规模的数据转变成更小的总结数据。比如,"+"可以用来作一个reducing函数,去返回输入数据列表的值的总和。

(5)把它们一起放在MapReduce中
Hadoop的MapReduce框架使用了上面的那些概念并用它们来处理大规模的数据信息。MapReduce程序有着两个组件:一个实现了mapper,另一个实现了reducer。上面描述的Mapper和Reducer术语在Hadoop中有了更细微的扩展,但基本的概念是相同的。
 键和值:在MapReduce中,没有一个值是单独的,每一个值都会有一个键与其关联,键标识相关的值。举个例子,从多辆车中读取到的时间编码车速表日志可以由车牌号码标识,就像下面一样:

AAA-123   65mph, 12:00pm
ZZZ-789   50mph, 12:02pm
AAA-123   40mph, 12:05pm
CCC-456   25mph, 12:15pm
...
mapping和reducing函数不是只接收数值(Values),而是(键,值)对。这些函数的每一个输出都是一样的:都是一个键和一个值,它们将被送到数据流的下一个列表。

对于Mapper和Reducer是如何工作的,MapReduce没有像其它语言那样严格。在更正式的函数式mapping和reducing设置 中,mapper针对每一个输入元素都要生成一个输出元素,reducer针对每一个输入列表都要生成一个输出元素。但在MapReduce中,每一个阶 段都可以生成任意的数值;mapper可能把一个输入map为0个,1个或100个输出。reducer可能计算超过一个的输入列表并生成一个或多个不同 的输出。

根据键划分reduce空间:reducing函数的作用是把大的数值列表转变为一个(或几个)输出数值。在MapReduce中,所有的输出数值一般不会被reduce在一起。有着相同键的所有数值会被一起送到一个reducer里。作用在有着不同键关联的数值列表上的reduce操作之间是独立执行的。



图4.3 不同颜色代表不同的键,有着相同键的数值都被传到同一个reduce任务里

(6)应用例子:词频统计(Word Count)
写一个简单的MapReduce程序就可以用来统计不同的词在一个文件集中出现的次数。比如,我们有这样的文件:
foo.txt: Sweet, this is the foo file
bar.txt: This is the bar file

我们期望输出会是这样子:

sweet 1
this  2
is    2
the   2
foo   1
bar   1
file  2
当然没问题,我们可以写一个MapReduce程序来计算得到这个输出。高层结构看起来会是这样子:

mapper (filename, file-contents):
for each word in file-contents:
emit (word, 1)

reducer (word, values):
sum = 0
for each value in values:
sum = sum + value
emit (word, sum)
列表4.1 MapReduce词频统计伪代码

若干个mapper函数的实例会被创建在我们的集群的不同机器上,每个实例接收一个不同的输入文件(这里假设我们有很多个文件)。 mappers输出的(word,1)键值对会被转到reducers那里去。若干个reducer方法实例也会在不同机子上被实例化。每个reducer负责处理关联到不同词的数值列表,数值列表中的值都是1;reducer把这些“1”值总和到一个关联了某个词的最终计数里。reducer然后生成最终的(word,count)输出,并把它写到一个输出文件里。

  针对这个,我们可以在Hadoop MapReduce中写一个很相似的程序;它被包括在Hadoop分发包中,具体在src/examples/org/apache/hadoop/examples/WordCount.java。它的部分代码如下:

public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}

/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}


列表4.2 MapReduce词频统计Java源码

实际Java实现与上述伪代码之间有一些微小的差别。首先,Java没有原生的emit关键字;你得到的OutputCollector输入对 象会接收数值并emit到下一执行阶段。第二,Hadoop使用的默认输入格式把输入文件的每一行作为mapper单独的一个输入,不是一次整个文件。其 中还使用了一个StringTokenizer对象用来把一行数据拆分为词组。这个操作没有对输入数据做任何规格化处理,所以 “cat”,“Cat”,“cat,”都被认为是不同的字符串。注意,类变量word在每一次mapper输出另外一个(word,1)键值对时都被重复使用;这个举措节省了为每个输出创建一个新的变量的时间。output.collect()方法会拷贝它收到的数值作为输入数据,所以你可以覆盖你使用的变量。

(7)驱动方法
Hadoop MapReduce程序的最后一个组件叫做Driver,它会初始化Job和指示Hadoop平台在输入文件集合上执行你的代码,并控制输出文件的放置地址。下面是Hadoop自带的Java实现例子里的一个整理版本driver的代码:

public void run(String inputPath, String outputPath) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);

FileInputFormat.addInputPath(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));

JobClient.runJob(conf);
}
列表4.3 Hadoop MapReduce词频统计驱动器

这个方法建立了一个在给定输入文件夹(inputPath参数)里的文件上执行词频统计程序的作业(Job)。reducers的输出被写到 outputath指定的文件夹内。用于运行job的配置信息保存在JobConf对象里。通过setMapperClass()和 setReducerClass()方法可以设定mapping和reducing函数。reducer生成的数据类型由 setOutputKeyClass()和setOutputValueClass()方法设定。默认情况下假定这些也是mapper的输出数据类型。如果你想设定不同的数据格式的话,可以通过JobConf的setMapOutputKeyClass()和 setMapOutputValueClass()方法设定。mapper的输入数据类型由InputFormat控制。输入格式在下面有详细的讨论。默认的输入格式是“TextInputFormat”,它会以(LongWritable,Text)键值对的方式加载数据。long值表示某一行在文件中的偏移量,Text对象则保存某一行的字符串内容。

通过调用JobClient.runJob(conf)即可向MapReduce提交job,这个调用会阻塞直到job完成。如果job失败了,它会抛出一个IOException。JobClient还提供了一个非阻塞版本的调用方法submitJob()。

2、MapReduce数据流

前面我们了解了一个基本的MapReduce作业由那些基本组件组成,从高层来看,所有的组件在一起工作时如下图所示:

图4.4高层MapReduce工作流水线

MapReduce的输入一般来自HDFS中的文件,这些文件分布存储在集群内的节点上。运行一个MapReduce程序会在集群的许多节点甚至所有节点上运行mapping任务,每一个mapping任务都是平等的:mappers没有特定“标识物”与其关联。因此,任意的mapper都可以处理任意的输入文件。每一个mapper会加载一些存储在运行节点本地的文件集来进行处理(译注:这是移动计算,把计算移动到数据所在节点,可以避免额外的数据传输开销)。
当mapping阶段完成后,这阶段所生成的中间键值对数据必须在节点间进行交换,把具有相同键的数值发送到同一个reducer那里。Reduce任务在集群内的分布节点同mappers的一样。这是MapReduce中唯一的任务节点间的通信过程。map任务间不会进行任何的信息交换,也不会去关心别的map任务的存在。相似的,不同的reduce任务之间也不会有通信。用户不能显式的从一台机器封送信息到另外一台机器;所有数据传送都是由Hadoop MapReduce平台自身去做的,这些是通过关联到数值上的不同键来隐式引导的。这是Hadoop MapReduce的可靠性的基础元素。如果集群中的节点失效了,任务必须可以被重新启动。如果任务已经执行了有副作用(side-effect)的操作,比如说,跟外面进行通信,那共享状态必须存在可以重启的任务上。消除了通信和副作用问题,那重启就可以做得更优雅些。
(1)近距离观察
在上一图中,描述了Hadoop MapReduce的高层视图。从那个图你可以看到mapper和reducer组件是如何用到词频统计程序中的,它们是如何完成它们的目标的。接下来,我们要近距离的来来看看这个系统以获取更多的细节。

图4.5细节化的Hadoop MapReduce数据流

图4.5展示了流线水中的更多机制。虽然只有2个节点,但相同的流水线可以复制到跨越大量节点的系统上。下去的几个段落会详细讲述MapReduce程序的各个阶段。

输入文件:文件是MapReduce任务的数据的初始存储地。正常情况下,输入文件一般是存在HDFS里。这些文件的格式可以是任意的;我们可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或其它一些格式。这些文件会很大—数十G或更大。
输入格式:InputFormat类定义了如何分割和读取输入文件,它提供有下面的几个功能:

* 选择作为输入的文件或对象;
* 定义把文件划分到任务的InputSplits;
* 为RecordReader读取文件提供了一个工厂方法;
Hadoop自带了好几个输入格式。其中有一个抽象类叫FileInputFormat,所有操作文件的InputFormat类都是从它那里继承功能和属性。当开启Hadoop作业时,FileInputFormat会得到一个路径参数,这个路径内包含了所需要处理的文件,FileInputFormat会读取这个文件夹内的所有文件(译注:默认不包括子文件夹内的),然后它会把这些文件拆分成一个或多个的InputSplit。你可以通过JobConf对象的setInputFormat()方法来设定应用到你的作业输入文件上的输入格式。下表给出了一些标准的输入格式:

输入格式
描述


TextInputFormat
默认格式,读取文件的行
行的字节偏移量
行的内容
KeyValueInputFormat
把行解析为键值对
第一个tab字符前的所有字符
行剩下的内容
SequenceFileInputFormat
Hadoop定义的高性能二进制格式
用户自定义
用户自定义
表4.1 MapReduce提供的输入格式

默认的输入格式是TextInputFormat,它把输入文件每一行作为单独的一个记录,但不做解析处理。这对那些没有被格式化的数据或是基于行的记录来说是很有用的,比如日志文件。更有趣的一个输入格式是KeyValueInputFormat,这个格式也是把输入文件每一行作为单独的一个记录。然而不同的是TextInputFormat把整个文件行当做值数据,KeyValueInputFormat则是通过搜寻tab字符来把行拆分为键值对。这在把一个MapReduce的作业输出作为下一个作业的输入时显得特别有用,因为默认输出格式(下面有更详细的描述)正是按KeyValueInputFormat格式输出数据。最后来讲讲SequenceFileInputFormat,它会读取特殊的特定于Hadoop的二进制文件,这些文件包含了很多能让Hadoop的mapper快速读取数据的特性。Sequence文件是块压缩的并提供了对几种数据类型(不仅仅是文本类型)直接的序列化与反序列化操作。Squence文件可以作为MapReduce任务的输出数据,并且用它做一个MapReduce作业到另一个作业的中间数据是很高效的。

输入块(InputSplit):一个输入块描述了构成MapReduce程序中单个map任务的一个单元。把一个MapReduce程序应用到一个数据集上,即是指一个作业,会由几个(也可能几百个)任务组成。Map任务可能会读取整个文件,但一般是读取文件的一部 分。默认情况下,FileInputFormat及其子类会以64MB(与HDFS的Block默认大小相同,译注:Hadoop建议Split大小与此 相同)为基数来拆分文件。你可以在hadoop-site.xml(译注:0.20.*以后是在mapred-default.xml里)文件内设定 mapred.min.split.size参数来控制具体划分大小,或者在具体MapReduce作业的JobConf对象中重写这个参数。通过以块形 式处理文件,我们可以让多个map任务并行的操作一个文件。如果文件非常大的话,这个特性可以通过并行处理大幅的提升性能。更重要的是,因为多个块 (Block)组成的文件可能会分散在集群内的好几个节点上(译注:事实上就是这样),这样就可以把任务调度在不同的节点上;因此所有的单个块都是本地处 理的,而不是把数据从一个节点传输到另外一个节点。当然,日志文件可以以明智的块处理方式进行处理,但是有些文件格式不支持块处理方式。针对这种情况,你可以写一个自定义的InputFormat,这样你就可以控制你文件是如何被拆分(或不拆分)成文件块的。自定义的文件格式在第五部分有描述。

输入格式定义了组成mapping阶段的map任务列表,每一个任务对应一个输入块。接着根据输入文件块所在的物理地址,这些任务会被分派到对应的系统节点上,可能会有多个map任务被分派到同一个节点上。任务分派好后,节点开始运行任务,尝试去最大并行化执行。节点上的最大任务并行数由 mapred.tasktracker.map.tasks.maximum参数控制。

记录读取器(RecordReader):InputSplit 定义了如何切分工作,但是没有描述如何去访问它。 RecordReader类则是实际的用来加载数据并把数据转换为适合mapper读取的键值对。RecordReader实例是由输入格式定义的,默认的输入格式,TextInputFormat,提供了一个LineRecordReader,这个类的会把输入文件的每一行作为一个新的值,关联到每一行 的键则是该行在文件中的字节偏移量。RecordReader会在输入块上被重复的调用直到整个输入块被处理完毕,每一次调用RecordReader都 会调用Mapper的map()方法。
Mapper:Mapper执行了MapReduce程序第一阶段中有趣的用户定义的工 作。给定一个键值对,map()方法会生成一个或多个键值对,这些键值对会被送到Reducer那里。对于整个作业输入部分的每一个map任务(输入块),每一个新的Mapper实例都会在单独的Java进程中被初始化,mapper之间不能进行通信。这就使得每一个map任务的可靠性不受其它map 任务的影响,只由本地机器的可靠性来决定。map()方法除了键值对外还会接收额外的两个参数(译注:在0.20.×后的版本,接口已变化,由Context对象代替这两个参数):
* OutputCollector对象有一个叫collect()的方法,它可以利用该方法把键值对送到作业的reduce阶段。
* Reporter对象提供当前任务的信息,它的getInputSplit()方法会返回一个描述当前输入块的对象,并且还允许map任务提供关 于系统执行进度的额外信息。setStatus()方法允许你生成一个反馈给用户的状态消息,incrCounter()方法允许你递增共享的高性能计数 器,除了默认的计数器外,你还可以定义更多的你想要的计数器。每一个mapper都可以递增计数器,JobTracker会收集由不同处理得到的递增数据 并把它们聚集在一起以供作业结束后的读取。
Partition & Shuffle:当第一个map任务完成后,节点可能还要继续执行更多的map任务,但这时候也开始把map任务的中间输出交换到需要它们的reducer那里去,这个移动map输出到reducer的过程叫做shuffle。每一个reduce节点会分派到中间输出的键集合中的一个不同的子集合,这些子集合(被称为“partitions”)是reduce任务的输入数据。每一个map任务生成的键值对可能会隶属于任意的partition,有着相同键的数值总是在一起被reduce,不管它是来自那个mapper的。因此,所有的map节点必须就把不同的中间数据发往何处达成一致。Partitioner类就是用来决定给定键值对的去向,默认的分类器(partitioner)会计算键的哈希值并基于这个结果来把键赋到相应的partition上,自定义的分类器在第五部分有详细描述。
排序:每一个reduce任务负责归约(reduceing)关联到相同键上的所有数值,每一个节点收到的中间键集合在被送到具体的reducer那里前就已经自动被Hadoop排序过了。
归约(Reduce):每 个reduce任务都会创建一个Reducer实例,这是一个用户自定义代码的实例,负责执行特定作业的第二个重要的阶段。对于每一个已赋予到reducer的partition内的键来说,reducer的reduce()方法只会调用一次,它会接收一个键和关联到键的所有值的一个迭代器,迭代器会以一个未定义的顺序返回关联到同一个键的值。reducer也要接收一个OutputCollector和Report对象,它们像在map()方 法中那样被使用。
输出格式:提供给OutputCollector的键值对会被写到输出文件中,写入的方式由输出格式控制。 OutputFormat的功能跟前面描述的InputFormat类很像,Hadoop提供的OutputFormat的实例会把文件写在本地磁盘或HDFS上,它们都是继承自公共的FileInputFormat类。每一个reducer会把结果输出写在公共文件夹中一个单独的文件内,这些文件的命名一般是part-nnnnn,nnnnn是关联到某个reduce任务的partition的id,输出文件夹通过FileOutputFormat.setOutputPath() 来设置。你可以通过具体MapReduce作业的JobConf对象的setOutputFormat()方法来设置具体用到的输出格式。下表给出了已提供的输出格式:

输出格式
描述
TextOutputFormat
默认的输出格式, 以 "key \t value" 的方式输出行
SequenceFileOutputFormat
输出二进制文件,适合于读取为子MapReduce作业的输入
NullOutputFormat
忽略收到的数据,即不做输出
表4.2 Hadoop提供的输出格式

Hadoop提供了一些OutputFormat实例用于写入文件,基本的(默认的)实例是TextOutputFormat,它会以一行一个键值对的方式把数据写入一个文本文件里。这样后面的MapReduce任务就可以通过 KeyValueInputFormat类简单的重新读取所需的输入数据了,而且也适合于人的阅读。还有一个更适合于在MapReduce作业间使用的中 间格式,那就是SequenceFileOutputFormat,它可以快速的序列化任意的数据类型到文件中,而对应 SequenceFileInputFormat则会把文件反序列化为相同的类型并提交为下一个Mapper的输入数据,方式和前一个Reducer的生 成方式一样。NullOutputFormat不会生成输出文件并丢弃任何通过OutputCollector传递给它的键值对,如果你在要 reduce()方法中显式的写你自己的输出文件并且不想Hadoop框架输出额外的空输出文件,那这个类是很有用的。
RecordWriter:这个跟InputFormat中通过RecordReader读取单个记录的实现很相似,OutputFormat类是RecordWriter对象的工厂方法,用来把单个的记录写到文件中,就像是OuputFormat直接写入的一样。
Reducer输出的文件会留在HDFS上供你的其它应用使用,比如另外一个MapReduce作业,或一个给人工检查的单独程序。

(2)额外的MapReduce功能

图4.6 插入了Combiner的MapReduce数据流

Combiner:前面展示的流水线忽略了一个可以优化MapReduce作业所使用带宽的步骤,这个过程叫Combiner,它在Mapper之后Reducer之前运行。 Combiner是可选的,如果这个过程适合于你的作业,Combiner实例会在每一个运行map任务的节点上运行。Combiner会接收特定节点上的Mapper实例的输出作为输入,接着Combiner的输出会被发送到Reducer那里,而不是发送Mapper的输出。Combiner是一个 “迷你reduce”过程,它只处理单台机器生成的数据。
词频统计是一个可以展示Combiner的用处的基础例子,上面的词频统计程序为每一个它看到的词生成了一个(word,1)键值对。所以如果在同一个文档内“cat”出现了3次,(”cat”,1)键值对会被生成3次,这些键值对会被 送到Reducer那里。通过使用Combiner,这些键值对可以被压缩为一个送往Reducer的键值对(”cat”,3)。现在每一个节点针对每一个词只会发送一个值到reducer,大大减少了shuffle过程所需要的带宽并加速了作业的执行。这里面最爽的就是我们不用写任何额外的代码就可以享用此功能!如果你的reduce是可交换及可组合的,那么它也就可以作为一个Combiner。你只要在driver中添加下面这行代码就可以在词频统计程序中启用Combiner。
conf.setCombinerClass(Reduce.class);
Combiner应是Reducer接口的实例,如果你的Reducer由于不可交换或不可组合不能作为Combiner,你仍可以写一个第三方类来作为你的作业的Combiner。

(3)容错性
使用Hadoop来运行你的作业的其中一个主要原因就是它的高容错性,就算在由高失败率的节点或网络组成的大集群内运行的作业,Hadoop都可以让作业成功完成。
Hadoop实现容错的主要方法就是重新执行任务,单个任务节点(TaskTracker)会不断的与系统的核心节点(JobTracker)进行通信,如果一个TaskTracker在一定时间内(默认是1分钟)无法与JobTracker进行通信,那JobTracker会假设这个TaskTracker出问题挂了,JobTracker了解给每个TaskTracker赋予了那些map和reduce任务。
如果作业仍然在mapping阶段,其它的TaskTracker会被要求重新执行所有的由前一个失败的TaskTracker所执行的map任务。如果作业在reduce阶段,则其它的TaskTracker会被要求重新执行所有的由前一个失败的TaskTracker所执行的reduce任务。
Reduce任务一旦完成会把数据写到HDFS。因此,如果一个TaskTracker已经完成赋予它的3个reduce任务中的2个,那只有第三个任务会被重新执行。Map任务则更复杂一点:即使一个节点已经完成了10个map任务,reducer仍可能无法获取这些map任务的所有的输出。如果此时节点挂了,那它的mapper输出就不可访问了。所以已经完成的map任务也必须被重新执行以使它们的输出结果对剩下的reducing机器可用,所有的这 些都是由Hadoop平台自动操作完成的。
这个容错性强调需要程序的执行没有副作用影响,如果Mapper和Reducer有自身的标识并和外部有通信,那重新执行一个任务可能需要其它节点去和新的map或reduce任务实例进行通信,并且重启的任务可能需要重建它们的中间状态。这个过程是很复杂的并且容易出错。MapReduce通过去除任务标识或任务间的通信而大大简化了这个问题。单个任务只能看到它自己的输入和输出,这样就使得错误与重启过程变成清晰可靠。
推测性的执行(Speculative execution):Hadoop系统有一个问题,它把任务分派到很多个节点,其中很有可能有一些慢的节点会限制剩下程序的执行速度。举个例子,如果有个节点内有一个比较慢的磁盘控制器,那它读取输入数据的速度可能只有所有其它节点的速度的10%。所以当99个map任务都已经完成了,系统仍在等待最后那个比较耗时的map任务完成。
通过强迫任务独立运行于其它的任务,使得单个任务之间不会知道它们的输入数据来自哪里。任务相信Hadoop平台会派送合适的输入到它们那里。因此,对于相同的输入数据, 我们可以并行多次处理以利用不同机器的负载能力。因为作业中大多数的任务都已经完成了,Hadoop平台会在几个空闲的节点上调度执行剩余任务的拷贝,这个过程叫做推测性的执行。当任务完成时,它会向JobTracker通告。任何一个首先完成的拷贝任务将成为权威拷贝,如果其他拷贝任务还在推测性的执行中,Hadoop会告诉TaskTracker去终止这些任务并丢弃它们的输出,接着Reducer会从首先完成的Mapper那里获取输入数据。
推测性的执行默认是启用的,你可以通过设置JobConf中的mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution为false来禁用mapper和reducer的推测性的执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: