Hadoop中CombineFileInputFormat详解
2013-12-03 14:38
246 查看
在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
第一次:将同DN上的所有block生成Split,生成方式:
1.循环nodeToBlocks,获得每个DN上有哪些block
2.循环这些block列表
3.将block从blockToNodes中移除,避免同一个block被包含在多个split中
4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中
5.向临时变量curSplitSize增加block的大小
6.判断curSplitSize是否已经超过了设置的maxSize
a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks
b) 没有超过,继续循环block列表,跳到第2步
7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)
a) 如果允许,执行并添加split信息
b) 如果不被允许,将这些剩余的block归还blockToNodes
8.重置
9.跳到步骤1
第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
源码总结:
合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
将可以合并的block写到同一个split中
下面是实践代码:
原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
自定义CombineSequenceFileInputFormat:
实现 CombineSequenceFileRecordReader
main函数比较简单,这里也贴出来下,方便后续自己记忆:
性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。
存在问题:
合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
第一次:将同DN上的所有block生成Split,生成方式:
1.循环nodeToBlocks,获得每个DN上有哪些block
2.循环这些block列表
3.将block从blockToNodes中移除,避免同一个block被包含在多个split中
4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中
5.向临时变量curSplitSize增加block的大小
6.判断curSplitSize是否已经超过了设置的maxSize
a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks
b) 没有超过,继续循环block列表,跳到第2步
7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)
a) 如果允许,执行并添加split信息
b) 如果不被允许,将这些剩余的block归还blockToNodes
8.重置
9.跳到步骤1
// process all nodes and create splits that are local // to a node. //创建同一个DN上的split for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) { Map.Entry<String, List<OneBlockInfo>> one = iter.next(); nodes.add(one.getKey()); List<OneBlockInfo> blocksInNode = one.getValue(); // for each block, copy it into validBlocks. Delete it from // blockToNodes so that the same block does not appear in // two different splits. for (OneBlockInfo oneblock : blocksInNode) { if (blockToNodes.containsKey(oneblock)) { validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; // if the accumulated split size exceeds the maximum, then // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array //创建这些block合并后的split,并将其split添加到split列表中 addCreatedSplit(job, splits, nodes, validBlocks); //重置 curSplitSize = 0; validBlocks.clear(); } } } // if there were any blocks left over and their combined size is // larger than minSplitNode, then combine them into one split. // Otherwise add them back to the unprocessed pool. It is likely // that they will be combined with other blocks from the same rack later on. //其实这里的注释已经说的很清楚,我再按照我的理解说一下 /** * 这里有几种情况: * 1、在这个DN上还有没有被split的block, * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值), * 将把这些block合并成一个split * 2、剩余的block的大小还是没有达到,将剩余的这些block * 归还给blockToNodes,等以后统一处理 */ if (minSizeNode != 0 && curSplitSize >= minSizeNode) { // create an input split and add it to the splits array addCreatedSplit(job, splits, nodes, validBlocks); } else { for (OneBlockInfo oneblock : validBlocks) { blockToNodes.put(oneblock, oneblock.hosts); } } validBlocks.clear(); nodes.clear(); curSplitSize = 0; }
第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
// if blocks in a rack are below the specified minimum size, then keep them // in 'overflow'. After the processing of all racks is complete, these overflow // blocks will be combined into splits. ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); ArrayList<String> racks = new ArrayList<String>(); // Process all racks over and over again until there is no more work to do. //这里处理的就不再是同一个DN上的block //同一个DN上的已经被处理过了(上面的代码),这里是一些 //还没有被处理的block while (blockToNodes.size() > 0) { // Create one split for this rack before moving over to the next rack. // Come back to this rack after creating a single split for each of the // remaining racks. // Process one rack location at a time, Combine all possible blocks that // reside on this rack as one split. (constrained by minimum and maximum // split size). // iterate over all racks //创建同机架的split for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks.entrySet().iterator(); iter.hasNext();) { Map.Entry<String, List<OneBlockInfo>> one = iter.next(); racks.add(one.getKey()); List<OneBlockInfo> blocks = one.getValue(); // for each block, copy it into validBlocks. Delete it from // blockToNodes so that the same block does not appear in // two different splits. boolean createdSplit = false; for (OneBlockInfo oneblock : blocks) { //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split if (blockToNodes.containsKey(oneblock)) { validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; // if the accumulated split size exceeds the maximum, then // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks); createdSplit = true; break; } } } // if we created a split, then just go to the next rack if (createdSplit) { curSplitSize = 0; validBlocks.clear(); racks.clear(); continue; } //还有没有被split的block //如果这些block的大小大于了同机架的最小split, //则创建split //否则,将这些block留到后面处理 if (!validBlocks.isEmpty()) { if (minSizeRack != 0 && curSplitSize >= minSizeRack) { // if there is a mimimum size specified, then create a single split // otherwise, store these blocks into overflow data structure addCreatedSplit(job, splits, getHosts(racks), validBlocks); } else { // There were a few blocks in this rack that remained to be processed. // Keep them in 'overflow' block list. These will be combined later. overflowBlocks.addAll(validBlocks); } } curSplitSize = 0; validBlocks.clear(); racks.clear(); } }
最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
源码总结:
合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
将可以合并的block写到同一个split中
下面是实践代码:
原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
自定义CombineSequenceFileInputFormat:
package com.hadoop.combineInput; import java.io.IOException; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class); } }
实现 CombineSequenceFileRecordReader
package com.hadoop.combineInput; import java.io.IOException; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.util.ReflectionUtils; public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> { private CombineFileSplit split; private TaskAttemptContext context; private int index; private RecordReader<K, V> rr; @SuppressWarnings("unchecked") public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { this.index = index; this.split = (CombineFileSplit) split; this.context = context; this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration()); } @SuppressWarnings("unchecked") @Override public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException { this.split = (CombineFileSplit) curSplit; this.context = curContext; if (null == rr) { rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration()); } FileSplit fileSplit = new FileSplit(this.split.getPath(index), this.split.getOffset(index), this.split.getLength(index), this.split.getLocations()); this.rr.initialize(fileSplit, this.context); } @Override public float getProgress() throws IOException, InterruptedException { return rr.getProgress(); } @Override public void close() throws IOException { if (null != rr) { rr.close(); rr = null; } } @Override public K getCurrentKey() throws IOException, InterruptedException { return rr.getCurrentKey(); } @Override public V getCurrentValue() throws IOException, InterruptedException { return rr.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return rr.nextKeyValue(); } }参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java
main函数比较简单,这里也贴出来下,方便后续自己记忆:
package com.hadoop.combineInput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MergeFiles extends Configured implements Tool { public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> { public void map(BytesWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } // END: MapClass public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.max.split.size", "157286400"); conf.setBoolean("mapred.output.compress", true); Job job = new Job(conf); job.setJobName("MergeFiles"); job.setJarByClass(MergeFiles.class); job.setMapperClass(MapClass.class); job.setInputFormatClass(CombineSequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setNumReduceTasks(0); return job.waitForCompletion(true) ? 0 : 1; } // END: run public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new MergeFiles(), args); System.exit(ret); } // END: main } //
性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。
存在问题:
合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
相关文章推荐
- Hadoop中CombineFileInputFormat详解——处理大量小文件
- hadoop old API CombineFileInputFormat
- Hadoop CombineFileInputFormat原理说明(转)
- Hadoop CombineFileInputFormat原理说明(转)
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat(整个小文件读入到map中)
- hadoop CombineFileInputFormat
- hadoop的archive归档和CombineFileInputFormat的使用
- Hadoop CombineFileInputFormat原理说明
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat
- hadoop编程小技巧(6)---处理大量小数据文件CombineFileInputFormat应用
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat(每次往map中读入1行)
- hadoop CombineFileInputFormat的使用
- 用CombineFileInputFormat优化Hadoop小文件
- Hadoop MapReduce处理小的压缩文件:基于CombineFileInputFormat
- Hadoop使用CombineFileInputFormat处理大量小文件接口实现(Hadoop-1.0.4)
- streaming方式的CombineFileInputFormat实现
- FileInputFormat详解
- Sample MultipleFileWordcount CombineFileInputFormat
- 小文件与CombineFileInputFormat
- 简单 实现CombineFileInputFormat