您的位置:首页 > 其它

MR过程

2015-10-19 23:39 197 查看
Hadoop – MapReduce过程

昨天我们对MapReduce作了大概了解,知道它如何进行数据处理。今天我们走进MapReduce,分析MapReduce源代码,看看它到底是如何实现的,我们会根据数据流动的顺序来进行分析 :-)


1.读取数据(InputFormat)

读取数据由InputFormat类完成。InputFormat类的功能如下:

验证作业的输入格式

由InputSplit类将指定输入目录中的输入文件拆分成逻辑文件,即key/value序对,每个逻辑文件将被分配到一个Mapper中

提供RecordReader实现类,从逻辑文件中搜集输入记录给Mapper类进行处理


InputFormat类 定义了两个抽象方法:getSplits和createRecordReader,前者根据作业(job)的配置,将输入文件切片并返回一个 InputSplit类型的数组;后者为InputSplit实例对象生成一个RecordReader对象,在InputSplit对象使用 前,MapReduce框架会先进行RecordReader的实例化操作。InputFormat类图如Figure1:

Hadoop – MapReduce过程

Figure1:InputFormat类图

InputSplit类定义了两个抽象方法:getLength和getLocations,getLength方法获取分片的大小,因此能根据输入分片的大小进行排序;getLocations方法获取输入数据所在节点的名称。InputSplit类图如Figure2:

Hadoop – MapReduce过程

Figure2:InputSplit类图

RecordReader类实现了Closeable接口,共6个抽象方法:initialize,nextKeyValue,getCurrentKey,getCurrentValue,getProgress和close。RecordReader类图如下:

Hadoop – MapReduce过程

Figure3:RecordReader类图

initialize:调用RecordReader对象时进行实例化;

nextKeyValue:读取下一个key/value对,读取成功返回true;

getCurrentKey:获取当前key值,返回当前key值,如果没有key值返回null;

getCurrentValue:获取当前value,即记录。

getProgress:获取当前RecordReader处理数据的进度,返回数值为0.0~1.0

close:关闭RecordReader


RecordReader将输入数据切片并转换成key/value对,该key/value对作为Mapper的输入。一般情况下,输入文件分片后是一组连续的记录,最少有N个分片(N是输入文件数)。用户需要控制好分片数,因为分片的大小和数量对作业的运作性能影响很大。

在使用InputFormat类时,需要指定数据的输入格式。Hadoop Map-Reduce框架提供了大量输入格式可供选择,主要区别在于文本输入格式(textual input format)和二进制输入格式(binary input format),下面列出几个较常用的格式:

KeyValueTextInputFormat:每行一对key/value序对;

TextInputFormat:key值为行号,value为行的内容;

NLineInputFormat:与KeyValueTextInputFormat类似,不同点是NLineInputFormat根据输入文件的行数分片,KeyValueTextInputFormat根据输入文件字节数分片;

MultiFileInputFormat:一个抽象类,用户可以通过实现该类,将多个文件聚集到一个分片中;

SequenceFileInputFormat:输入文件为Hadoop序列文件,其中包含序列化的key/value对。


KeyValueTextInputFormat和SequenceFileInputFormat格式是最常用的输入格式。可使用如下格式进行设置:

首先定义一个JobConf类对象(当然该对象需要初始化):

JobConf conf;

然后通过conf调用setInputFormat设置输入数据的格式:

conf.setInputFormat(KeyValueTextInputFormat.class);

最后设置输入目录的路径:

FileInputFormat.setInputPaths(conf,MapReduceConfig.getInputDirectionary() );

由InputFormat类图可以看到,创建记录读取器时还需要TaskAttemptContext对象,获取分片时需要JobContext对 象,它们分别是获取任务(Task)和作业(Job)的信息。因为每一个文件的分片对应于一个任务(task),而getSplits方法返回的是 InputSplit类数组,用于完成整个作业(job),因此调用getSplits方法需要提供job的配置信息。这两个类我们后面再进行详细分析。 现在我们来总结一下读取数据需要做些什么:

指定输入文件的目录

指定一个类用于读取数据并将该数据转换成key/value对


到此读取数据阶段完成,接下来进入Map阶段,我们拭目以待吧 :-)

2.Map阶段

完成读取数据操作后,现在我们可以将分片后的数据作为Mapper的输入,进入Map阶段。先来看看Mapper类中包含什么东西,如图Figure4:

Hadoop – MapReduce过程

Figure4:Mapper类图

Maps是用于将输入记录转换成中间记录的一个个独立的任务。这些中间记录不需要与输入记录保持相同数据类型。一个给定的输入
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: