MapReduce源码解析之InputFormat
2016-09-10 19:45
267 查看
一直断断续续有研读MapReduce源码,但是没有系统的整理过,后面打算分N篇笔记记录下整个MR的源码剖析,这里先不分析各种设计架构,打算在所有源码阅读结束之后再整理下整体架构。如果对MapReduce不了解的同学,请自行百度大体架构。
HDFS当中存储着Map需要的源文件,HDFS又是以block为存储单元进行文件存储的,而Map实际上需要的是KV对输入,所以这中间必然需要一个转换的过程,这个过程应该怎么去抽象以及设计,才能比较灵活的承担起文件存储到KV对输入的转换。在实现MR程序的时候,想必都使用过相应的InputFormat实现类。其实InputFormat主要做两件事情:1. 数据切分,按照一定策略将输入切分成多个split,确定Map的数量以及与之对应的split
2. 为Map提供输入数据,给定某个split,能够正确获取KV对。下图列举了几个常用的InputFormat的实现,源码解析主要侧重InputFormat和FileInputFormat。
其中InputFormat是一个抽象类,里面定义了两个方法:一个定义了如何将File生成InputSplit,另外一个定义了如何将InputSplit转换成KV对。
下面直接看FileInputFormat的源码实现,源码直接列出,里面采用注释的方式,同时也是自己记笔记的过程。
FileInputFormat并未实现第二个如何进行KV转化的接口,阅读下TextInputFormat实现:
其中设计比较重要的是LineRecordReader,类图如下:
实现了迭代器的功能,代码涉及文件读写细节比较多,这里不赘述了。
其实InputFormat实现的就是开头讲到的两大功能,根据文件格式和类型(包括压缩)的不同来实现不同的读取细节。但是最终都是转化为KV对。
此外其实默认情况下就是:一个block对应一个InputSplit,一个InpuSplit对应一个Map,如果要控制Map数量,那么调大minSize,Map数量减少,调小maxSize,Map数量增多。
后续再细读下listStatus方法当中多线程获取fileStatus那段逻辑,比较绕,没太看懂。
HDFS当中存储着Map需要的源文件,HDFS又是以block为存储单元进行文件存储的,而Map实际上需要的是KV对输入,所以这中间必然需要一个转换的过程,这个过程应该怎么去抽象以及设计,才能比较灵活的承担起文件存储到KV对输入的转换。在实现MR程序的时候,想必都使用过相应的InputFormat实现类。其实InputFormat主要做两件事情:1. 数据切分,按照一定策略将输入切分成多个split,确定Map的数量以及与之对应的split
2. 为Map提供输入数据,给定某个split,能够正确获取KV对。下图列举了几个常用的InputFormat的实现,源码解析主要侧重InputFormat和FileInputFormat。
其中InputFormat是一个抽象类,里面定义了两个方法:一个定义了如何将File生成InputSplit,另外一个定义了如何将InputSplit转换成KV对。
下面直接看FileInputFormat的源码实现,源码直接列出,里面采用注释的方式,同时也是自己记笔记的过程。
public List<InputSplit> getSplits(JobContext job) throws IOException { Stopwatch sw = new Stopwatch().start(); //获取最小数量,至少为1,参数可配置:mapreduce.input.fileinputformat.split.minsize long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //获取最大数量,最大为Long.MAX,参数可配置:mapreduce.input.fileinputformat.split.maxsize" long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; //应该不存在非LocatedFileStatus?子类重载 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } // 子类重载 if (isSplitable(job, path)) { //获取blockSize long blockSize = file.getBlockSize(); //max(minSize, min(maxSize, blockSize)) //最大为blockSize,否则才是用户配置的,理解有误,最大为minSize long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; //超过splitSize1.1倍 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //定位到其中某一个block,否则越界 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } //splitSize设置一定不能:1<= length/splitSize <= 1.1,否则无效 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits; }其中listStatus代码解读如下:
protected List<FileStatus> listStatus(JobContext job ) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); //默认的过滤规则,文件开头不能是_或者. filters.add(hiddenFileFilter); //获取用户自定义文件过滤规则,参数配置:mapreduce.input.path 9776 Filter.class PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); List<FileStatus> result = null; //默认处理线程数1,可配置 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS); Stopwatch sw = new Stopwatch().start(); if (numThreads == 1) { //单线程处理 result = singleThreadedListStatus(job, dirs, inputFilter, recursive); } else { Iterable<FileStatus> locatedFiles = null; try { //多线程处理相对比较复杂 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( job.getConfiguration(), dirs, recursive, inputFilter, true); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException e) { throw new IOException("Interrupted while getting file statuses"); } result = Lists.newArrayList(locatedFiles); } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); } LOG.info("Total input paths to process : " + result.size()); return result; }
FileInputFormat并未实现第二个如何进行KV转化的接口,阅读下TextInputFormat实现:
@Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { //获取分隔符 String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //返回行读取迭代器 return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } //是否是可分割的压缩方式 return codec instanceof SplittableCompressionCodec; }
其中设计比较重要的是LineRecordReader,类图如下:
实现了迭代器的功能,代码涉及文件读写细节比较多,这里不赘述了。
其实InputFormat实现的就是开头讲到的两大功能,根据文件格式和类型(包括压缩)的不同来实现不同的读取细节。但是最终都是转化为KV对。
此外其实默认情况下就是:一个block对应一个InputSplit,一个InpuSplit对应一个Map,如果要控制Map数量,那么调大minSize,Map数量减少,调小maxSize,Map数量增多。
后续再细读下listStatus方法当中多线程获取fileStatus那段逻辑,比较绕,没太看懂。
相关文章推荐
- MapReduce源码解析之InputFormat(二)
- MapReduce 的格式输入----SequenceFileInputFormat ---源码分析
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- MapReduce源码分析之InputFormat
- MapReduce的输入格式---KeyValueTextInputFormat---源码分析
- MapReduce之数据读取组件InputFormat原理解析
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- MapReduce源码之InputFormat
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- MapReduce数据输入中InputFormat类源码解析
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- MapReduce源码之FileInputFormat
- Hadoop源码学习之-----Mapreduce输入流:InputFormat,InputSplit,RecordReader
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- MapReduce源码分析之InputFormat
- Hadoop源码解析之: TextInputFormat如何处理跨split的行