[转载] MapReduce中TextInputFormat分片和读取分片数据源码级分析
2015-12-22 10:21
253 查看
转载自: http://www.cnblogs.com/lxf20061900/p/3810977.html
InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:
(1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split;
(2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用。
InputFormat有两个比较重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。这两个方法分别对应上面的两个功能。
InputSplit分片信息有两个特点:(1)是逻辑分片,只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,只记录一些元信息,比如起始位置、长度以及所在的节点列表等;(2)必须可序列化,分片信息要上传到HDFS文件,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。
RecordReader对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value,会作为MapTask的map方法的输入。
我们本节就以最长使用的TextInputFormat为列来讲解分片和读取分片数据。
先看继承关系:(1)public class TextInputFormat extends FileInputFormat;(2)public abstract class FileInputFormat<K, V> extends InputFormat;(3)public abstract class InputFormat。最顶的父类InputFormat只有两个未实现的抽象方法getSplits和createRecordReader;而FileInputFormat包含的方法比较多,如下图:
View Code
这个方法会控制split的读取数据的结束位置,上面的readLine方法只关注输入流不会管split的大小的。需要注意的是其中的while循环,其中的pos和end表示当前在文件中的偏移量和split的结束位置,即使这个split的最后一行跨split也会完整的获取一行。也就保证了一个记录的完整性。mapper获取key/value会通过调用getCurrentKey()和getCurrentValue()来达到的,但是调用这俩方法前得先调用nextKeyValue()方法才能实现key和value的赋值。
到这我们回头看看上面的那个特殊问题,就是split的大小超过block的大小数据读取的问题,我们前面已经讲过split是逻辑分片,不是物理分片,当MapTask的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能还在本机也可能不在本机,如果是后者的话就要从别的节点拉数据过来,因为实际获取数据是一个输入流,这个输入流面向的是整个文件,不受什么block啊、split的影响,split的大小越大可能需要从别的节点拉的数据越多,从从而效率也会越慢,拉数据的多少是由getSplits方法中的splitSize决定的。所以为了更有效率,应该遵循上面的黑体字。
至此,TextInputFormat的分片和数据读取过程讲完了。这只是一个例子,其他InputFormat可以参考这个。
参考:1、董西成,《hadoop技术内幕---深入理解MapReduce架构设计与实现原理》
2、http://www.cnblogs.com/clarkchen/archive/2011/06/02/2068609.html
InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:
(1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split;
(2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用。
InputFormat有两个比较重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。这两个方法分别对应上面的两个功能。
InputSplit分片信息有两个特点:(1)是逻辑分片,只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,只记录一些元信息,比如起始位置、长度以及所在的节点列表等;(2)必须可序列化,分片信息要上传到HDFS文件,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。
RecordReader对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value,会作为MapTask的map方法的输入。
我们本节就以最长使用的TextInputFormat为列来讲解分片和读取分片数据。
先看继承关系:(1)public class TextInputFormat extends FileInputFormat;(2)public abstract class FileInputFormat<K, V> extends InputFormat;(3)public abstract class InputFormat。最顶的父类InputFormat只有两个未实现的抽象方法getSplits和createRecordReader;而FileInputFormat包含的方法比较多,如下图:
public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }
View Code
这个方法会控制split的读取数据的结束位置,上面的readLine方法只关注输入流不会管split的大小的。需要注意的是其中的while循环,其中的pos和end表示当前在文件中的偏移量和split的结束位置,即使这个split的最后一行跨split也会完整的获取一行。也就保证了一个记录的完整性。mapper获取key/value会通过调用getCurrentKey()和getCurrentValue()来达到的,但是调用这俩方法前得先调用nextKeyValue()方法才能实现key和value的赋值。
到这我们回头看看上面的那个特殊问题,就是split的大小超过block的大小数据读取的问题,我们前面已经讲过split是逻辑分片,不是物理分片,当MapTask的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能还在本机也可能不在本机,如果是后者的话就要从别的节点拉数据过来,因为实际获取数据是一个输入流,这个输入流面向的是整个文件,不受什么block啊、split的影响,split的大小越大可能需要从别的节点拉的数据越多,从从而效率也会越慢,拉数据的多少是由getSplits方法中的splitSize决定的。所以为了更有效率,应该遵循上面的黑体字。
至此,TextInputFormat的分片和数据读取过程讲完了。这只是一个例子,其他InputFormat可以参考这个。
参考:1、董西成,《hadoop技术内幕---深入理解MapReduce架构设计与实现原理》
2、http://www.cnblogs.com/clarkchen/archive/2011/06/02/2068609.html
相关文章推荐
- 一般路由设置
- 面向对象个人总结其三 多态(一)
- mysql-5.7.10-winx64免安装配置方法以及mysql服务无法启动 服务没有报告任何错误解决办法
- android 百度地图定位总结
- cmd下传文件方法记录
- 使用Jenkins配置Git+Maven的自动化构建
- 数据结构总结-焦梦真
- redis
- redis
- 将博客搬至CSDN
- Linux top 命令各字段解释 .
- Linux建立svn版本库
- 以当前默认file encoding读取文件然后以utf-8写入新文件
- 农林植保无人机
- C++中的运算符重载
- 12月19日全球域名商保有量及市场份额排行榜16强
- 兼容 Android 4.4 透明状态栏与导航栏
- IP/TCP/UPD 头结构详解
- xshell putty等终端,如何记录终端输入输出
- 白话经典算法系列之五 归并排序的实现