您的位置:首页 > 运维架构

hadoop各种输入方法(InputFormat)汇总

2014-05-16 00:32 183 查看
知道了getsplit和getrecordreader是干嘛的

转自http://www.blogjava.net/shenh062326/archive/2012/07/03/382115.html

mapreduce中,一个job的map个数,每个map处理的数据量是如何决定的呢?另外每个map又是如何读取输入文件的内容呢?用户是否可以自己决定输入方式,决定map个数呢?这篇文章将详细讲述hadoop中各种InputFormat的功能和如何编写自定义的InputFormat.


简介: mapreduce作业会根据输入目录产生多个map任务,通过多个map任务并行执行来提高作业运行速度,但如果map数量过少,并行量低,作业执行慢,如果map数过多,资源有限,也会增加调度开销.因此,根据输入产生合理的map数,为每个map分配合适的数据量,能有效的提升资源利用率,并使作业运行速度加快.

在mapreduce中,每个作业都会通过InputFormat来决定map数量.InputFormat是一个接口,提供两个方法:

InputSplit[] getSplits(JobConf job,int
numSplits)throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter)throws
IOException;

其中getSplits方法会根据输入目录产生InputSplit数组,每个InputSplit会相应产生一个map任务,
map的输入定义在InputSplit中. getRecordReader方法返回一个RecordReader对象,
RecordReader决定了map任务如何读取输入数据,例如一行一行的读取还是一个字节一个字节的读取,等等.

下图是InputFormat的实现类:

(暂时无法上传)

这理详细介绍FileInputFormat和CombineFileInputFormat,其它不常用,有兴趣的可以自己查看hadoop源码.



FileInputFormat(旧接口org.apache.hadoop.mapred)



mapreduce默认使用TextInputFormat,TextInputFormat没有实现自己的getSplits方法,它继承于FileInputFormat,因此使用了FileInputFormat的.
org.apache.hadoop.mapred.FileInputFormat的getSplits流程:
两个配置
mapred.min.split.size (一个map最小输入长度),
mapred.map.tasks (推荐map数量)
如何决定每个map输入长度呢?首先获取输入目录下所有文件的长度和,除以mapred.map.tasks得到一个推荐长度goalSize,然后通过式子:
Math.max(minSize, Math.min(goalSize, blockSize))决定map输入长度.这里的minSize为mapred.min.split.size,
blockSize为相应文件的block长度.这式子能保证一个map的输入至少大于mapred.min.split.size,对于推荐的map长度,只有它的长度小于blockSize且大于mapred.min.split.size才会有效果.由于mapred.min.split.size默认长度为1,因此通常情况下只要小于blockSize就有效果,否则使用blockSize做为map输入长度.
因此,如果想增加map数,可以把mapred.min.split.size调小(其实默认值即可),另外还需要把mapred.map.tasks设置大.
如果需要减少map数,可以把mapred.min.split.size调大,另外把mapred.map.tasks调小.
这里要特别指出的是FileInputFormat会让每个输入文件至少产生一个map任务,因此如果你的输入目录下有许多文件,而每个文件都很小,例如几十kb,那么每个文件都产生一个map会增加调度开销.作业变慢.
那么如何防止这种问题呢?CombineFileInputFormat能有效的减少map数量.

FileInputFormat(新接口org.apache.hadoop.mapreduce.lib.input)

Hadoop 0.20开始定义了一套新的mapreduce编程接口,使用新的FileInputFormat,它与旧接口下的FileInputFormat主要区别在于,它不再使用mapred.map.tasks,而使用mapred.max.split.size参数代替goalSize,通过Math.max(minSize,
Math.min(maxSize, blockSize))决定map输入长度,一个map的输入要大于minSize,小于
Math.min(maxSize, blockSize).

若需增加map数,可以把mapred.min.split.size调小,把mapred.max.split.size调大.若需减少map数,可以把mapred.min.split.size调大,并把mapred.max.split.size调小.

CombineFileInputFormat

顾名思义, CombineFileInputFormat的作用是把许多文件合并作为一个map的输入.
在它之前,可以使用MultiFileInputFormat,不过其功能太简单,它以文件为单位,一个文件至多分给一个map处理,如果某个目录下有许多小文件,
另外还有一个超大文件,处理大文件的map会严重偏慢.
CombineFileInputFormat是一个被推荐使用的InputFormat.它有三个配置:
mapred.min.split.size.per.node,一个节点上split的至少的大小
mapred.min.split.size.per.rack 一个交换机下split至少的大小
mapred.max.split.size 一个split最大的大小
它的主要思路是把输入目录下的大文件分成多个map的输入,并合并小文件,做为一个map的输入.具体的原理是下述三步:
1.根据输入目录下的每个文件,如果其长度超过mapred.max.split.size,以block为单位分成多个split(一个split是一个map的输入),每个split的长度都大于mapred.max.split.size,因为以block为单位,因此也会大于blockSize,此文件剩下的长度如果大于mapred.min.split.size.per.node,则生成一个split,否则先暂时保留.
2.现在剩下的都是一些长度效短的碎片,把每个rack下碎片合并,只要长度超过mapred.max.split.size就合并成一个split,最后如果剩下的碎片比mapred.min.split.size.per.rack大,就合并成一个split,否则暂时保留.
3.把不同rack下的碎片合并,只要长度超过mapred.max.split.size就合并成一个split,剩下的碎片无论长度,合并成一个split.
举例: mapred.max.split.size=1000
mapred.min.split.size.per.node=300
mapred.min.split.size.per.rack=100
输入目录下五个文件,rack1下三个文件,长度为2050,1499,10,
rack2下两个文件,长度为1010,80.另外blockSize为500.
经过第一步,生成五个split:
1000,1000,1000,499,1000.剩下的碎片为rack1下:50,10;
rack2下10:80
由于两个rack下的碎片和都不超过100,所以经过第二步,
split和碎片都没有变化.
第三步,合并四个碎片成一个split,长度为150.

如果要减少map数量,可以调大mapred.max.split.size,否则调小即可.
其特点是:一个块至多作为一个map的输入,一个文件可能有多个块,一个文件可能因为块多分给做为不同map的输入,一个map可能处理多个块,可能处理多个文件。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: