您的位置:首页 > 其它

[转载] MapReduce中TextInputFormat分片和读取分片数据源码级分析

2015-12-22 10:21 253 查看
转载自: http://www.cnblogs.com/lxf20061900/p/3810977.html
InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:

  (1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split;

  (2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用。

  InputFormat有两个比较重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。这两个方法分别对应上面的两个功能。

  InputSplit分片信息有两个特点:(1)是逻辑分片,只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,只记录一些元信息,比如起始位置、长度以及所在的节点列表等;(2)必须可序列化,分片信息要上传到HDFS文件,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。

  RecordReader对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value,会作为MapTask的map方法的输入。

  我们本节就以最长使用的TextInputFormat为列来讲解分片和读取分片数据。

  先看继承关系:(1)public class TextInputFormat extends FileInputFormat;(2)public abstract class FileInputFormat<K, V> extends InputFormat;(3)public abstract class InputFormat。最顶的父类InputFormat只有两个未实现的抽象方法getSplits和createRecordReader;而FileInputFormat包含的方法比较多,如下图:

public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}

// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}


View Code
  这个方法会控制split的读取数据的结束位置,上面的readLine方法只关注输入流不会管split的大小的。需要注意的是其中的while循环,其中的pos和end表示当前在文件中的偏移量和split的结束位置,即使这个split的最后一行跨split也会完整的获取一行。也就保证了一个记录的完整性。mapper获取key/value会通过调用getCurrentKey()和getCurrentValue()来达到的,但是调用这俩方法前得先调用nextKeyValue()方法才能实现key和value的赋值。

  

  到这我们回头看看上面的那个特殊问题,就是split的大小超过block的大小数据读取的问题,我们前面已经讲过split是逻辑分片,不是物理分片,当MapTask的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能还在本机也可能不在本机,如果是后者的话就要从别的节点拉数据过来,因为实际获取数据是一个输入流,这个输入流面向的是整个文件,不受什么block啊、split的影响,split的大小越大可能需要从别的节点拉的数据越多,从从而效率也会越慢,拉数据的多少是由getSplits方法中的splitSize决定的。所以为了更有效率,应该遵循上面的黑体字。

  至此,TextInputFormat的分片和数据读取过程讲完了。这只是一个例子,其他InputFormat可以参考这个。

参考:1、董西成,《hadoop技术内幕---深入理解MapReduce架构设计与实现原理》

     2、http://www.cnblogs.com/clarkchen/archive/2011/06/02/2068609.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: