您的位置:首页 > 其它

MapReduce源码解析之InputFormat

2016-09-10 19:45 267 查看
一直断断续续有研读MapReduce源码,但是没有系统的整理过,后面打算分N篇笔记记录下整个MR的源码剖析,这里先不分析各种设计架构,打算在所有源码阅读结束之后再整理下整体架构。如果对MapReduce不了解的同学,请自行百度大体架构。

HDFS当中存储着Map需要的源文件,HDFS又是以block为存储单元进行文件存储的,而Map实际上需要的是KV对输入,所以这中间必然需要一个转换的过程,这个过程应该怎么去抽象以及设计,才能比较灵活的承担起文件存储到KV对输入的转换。在实现MR程序的时候,想必都使用过相应的InputFormat实现类。其实InputFormat主要做两件事情:1. 数据切分,按照一定策略将输入切分成多个split,确定Map的数量以及与之对应的split
  2. 为Map提供输入数据,给定某个split,能够正确获取KV对。下图列举了几个常用的InputFormat的实现,源码解析主要侧重InputFormat和FileInputFormat。

其中InputFormat是一个抽象类,里面定义了两个方法:一个定义了如何将File生成InputSplit,另外一个定义了如何将InputSplit转换成KV对。

下面直接看FileInputFormat的源码实现,源码直接列出,里面采用注释的方式,同时也是自己记笔记的过程。

public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
//获取最小数量,至少为1,参数可配置:mapreduce.input.fileinputformat.split.minsize
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//获取最大数量,最大为Long.MAX,参数可配置:mapreduce.input.fileinputformat.split.maxsize"
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
//应该不存在非LocatedFileStatus?子类重载
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 子类重载
if (isSplitable(job, path)) {
//获取blockSize
long blockSize = file.getBlockSize();
//max(minSize, min(maxSize, blockSize))
//最大为blockSize,否则才是用户配置的,理解有误,最大为minSize
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
//超过splitSize1.1倍
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//定位到其中某一个block,否则越界
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//splitSize设置一定不能:1<= length/splitSize <= 1.1,否则无效
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
其中listStatus代码解读如下:

protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}

// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());

// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);

// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
//默认的过滤规则,文件开头不能是_或者.
filters.add(hiddenFileFilter);
//获取用户自定义文件过滤规则,参数配置:mapreduce.input.path
9776
Filter.class
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);

List<FileStatus> result = null;
//默认处理线程数1,可配置
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
Stopwatch sw = new Stopwatch().start();
if (numThreads == 1) {
//单线程处理
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
//多线程处理相对比较复杂
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}

sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
}
LOG.info("Total input paths to process : " + result.size());
return result;
}


FileInputFormat并未实现第二个如何进行KV转化的接口,阅读下TextInputFormat实现:

@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
//获取分隔符
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
//返回行读取迭代器
return new LineRecordReader(recordDelimiterBytes);
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
//是否是可分割的压缩方式
return codec instanceof SplittableCompressionCodec;
}


其中设计比较重要的是LineRecordReader,类图如下:

实现了迭代器的功能,代码涉及文件读写细节比较多,这里不赘述了。

其实InputFormat实现的就是开头讲到的两大功能,根据文件格式和类型(包括压缩)的不同来实现不同的读取细节。但是最终都是转化为KV对。

此外其实默认情况下就是:一个block对应一个InputSplit,一个InpuSplit对应一个Map,如果要控制Map数量,那么调大minSize,Map数量减少,调小maxSize,Map数量增多。

后续再细读下listStatus方法当中多线程获取fileStatus那段逻辑,比较绕,没太看懂。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: