您的位置:首页 > 运维架构

Hadoop中CombineFileInputFormat详解——处理大量小文件

2014-03-10 17:17 441 查看
在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。

所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。

CombineFileInputFormat 原理(网上牛人总结):

第一次:将同DN上的所有block生成Split,生成方式:

1.循环nodeToBlocks,获得每个DN上有哪些block

2.循环这些block列表

3.将block从blockToNodes中移除,避免同一个block被包含在多个split中

4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中

5.向临时变量curSplitSize增加block的大小

6.判断curSplitSize是否已经超过了设置的maxSize

a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks

b) 没有超过,继续循环block列表,跳到第2步

7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)

a) 如果允许,执行并添加split信息

b) 如果不被允许,将这些剩余的block归还blockToNodes

8.重置

9.跳到步骤1

[java] view
plaincopy





// process all nodes and create splits that are local

// to a node.

//创建同一个DN上的split

for (Iterator<Map.Entry<String,

List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();

iter.hasNext();) {

Map.Entry<String, List<OneBlockInfo>> one = iter.next();

nodes.add(one.getKey());

List<OneBlockInfo> blocksInNode = one.getValue();

// for each block, copy it into validBlocks. Delete it from

// blockToNodes so that the same block does not appear in

// two different splits.

for (OneBlockInfo oneblock : blocksInNode) {

if (blockToNodes.containsKey(oneblock)) {

validBlocks.add(oneblock);

blockToNodes.remove(oneblock);

curSplitSize += oneblock.length;

// if the accumulated split size exceeds the maximum, then

// create this split.

if (maxSize != 0 && curSplitSize >= maxSize) {

// create an input split and add it to the splits array

//创建这些block合并后的split,并将其split添加到split列表中

addCreatedSplit(job, splits, nodes, validBlocks);

//重置

curSplitSize = 0;

validBlocks.clear();

}

}

}

// if there were any blocks left over and their combined size is

// larger than minSplitNode, then combine them into one split.

// Otherwise add them back to the unprocessed pool. It is likely

// that they will be combined with other blocks from the same rack later on.

//其实这里的注释已经说的很清楚,我再按照我的理解说一下

/**

* 这里有几种情况:

* 1、在这个DN上还有没有被split的block,

* 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),

* 将把这些block合并成一个split

* 2、剩余的block的大小还是没有达到,将剩余的这些block

* 归还给blockToNodes,等以后统一处理

*/

if (minSizeNode != 0 && curSplitSize >= minSizeNode) {

// create an input split and add it to the splits array

addCreatedSplit(job, splits, nodes, validBlocks);

} else {

for (OneBlockInfo oneblock : validBlocks) {

blockToNodes.put(oneblock, oneblock.hosts);

}

}

validBlocks.clear();

nodes.clear();

curSplitSize = 0;

}

第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)

[java] view
plaincopy





// if blocks in a rack are below the specified minimum size, then keep them

// in 'overflow'. After the processing of all racks is complete, these overflow

// blocks will be combined into splits.

ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();

ArrayList<String> racks = new ArrayList<String>();

// Process all racks over and over again until there is no more work to do.

//这里处理的就不再是同一个DN上的block

//同一个DN上的已经被处理过了(上面的代码),这里是一些

//还没有被处理的block

while (blockToNodes.size() > 0) {

// Create one split for this rack before moving over to the next rack.

// Come back to this rack after creating a single split for each of the

// remaining racks.

// Process one rack location at a time, Combine all possible blocks that

// reside on this rack as one split. (constrained by minimum and maximum

// split size).

// iterate over all racks

//创建同机架的split

for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =

rackToBlocks.entrySet().iterator(); iter.hasNext();) {

Map.Entry<String, List<OneBlockInfo>> one = iter.next();

racks.add(one.getKey());

List<OneBlockInfo> blocks = one.getValue();

// for each block, copy it into validBlocks. Delete it from

// blockToNodes so that the same block does not appear in

// two different splits.

boolean createdSplit = false;

for (OneBlockInfo oneblock : blocks) {

//这里很重要,现在的blockToNodes说明的是还有哪些block没有被split

if (blockToNodes.containsKey(oneblock)) {

validBlocks.add(oneblock);

blockToNodes.remove(oneblock);

curSplitSize += oneblock.length;

// if the accumulated split size exceeds the maximum, then

// create this split.

if (maxSize != 0 && curSplitSize >= maxSize) {

// create an input split and add it to the splits array

addCreatedSplit(job, splits, getHosts(racks), validBlocks);

createdSplit = true;

break;

}

}

}

// if we created a split, then just go to the next rack

if (createdSplit) {

curSplitSize = 0;

validBlocks.clear();

racks.clear();

continue;

}

//还有没有被split的block

//如果这些block的大小大于了同机架的最小split,

//则创建split

//否则,将这些block留到后面处理

if (!validBlocks.isEmpty()) {

if (minSizeRack != 0 && curSplitSize >= minSizeRack) {

// if there is a mimimum size specified, then create a single split

// otherwise, store these blocks into overflow data structure

addCreatedSplit(job, splits, getHosts(racks), validBlocks);

} else {

// There were a few blocks in this rack that remained to be processed.

// Keep them in 'overflow' block list. These will be combined later.

overflowBlocks.addAll(validBlocks);

}

}

curSplitSize = 0;

validBlocks.clear();

racks.clear();

}

}

最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了

源码总结:

合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack

将可以合并的block写到同一个split中
下面是实践代码:

原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。

自定义CombineSequenceFileInputFormat:

[java] view
plaincopy





package com.hadoop.combineInput;

import java.io.IOException;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;

import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {

@SuppressWarnings({ "unchecked", "rawtypes" })

@Override

public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);

}

}

实现 CombineSequenceFileRecordReader

[java] view
plaincopy





package com.hadoop.combineInput;

import java.io.IOException;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;

import org.apache.hadoop.util.ReflectionUtils;

public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {

private CombineFileSplit split;

private TaskAttemptContext context;

private int index;

private RecordReader<K, V> rr;

@SuppressWarnings("unchecked")

public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {

this.index = index;

this.split = (CombineFileSplit) split;

this.context = context;

this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());

}

@SuppressWarnings("unchecked")

@Override

public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {

this.split = (CombineFileSplit) curSplit;

this.context = curContext;

if (null == rr) {

rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());

}

FileSplit fileSplit = new FileSplit(this.split.getPath(index),

this.split.getOffset(index), this.split.getLength(index),

this.split.getLocations());

this.rr.initialize(fileSplit, this.context);

}

@Override

public float getProgress() throws IOException, InterruptedException {

return rr.getProgress();

}

@Override

public void close() throws IOException {

if (null != rr) {

rr.close();

rr = null;

}

}

@Override

public K getCurrentKey()

throws IOException, InterruptedException {

return rr.getCurrentKey();

}

@Override

public V getCurrentValue()

throws IOException, InterruptedException {

return rr.getCurrentValue();

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

return rr.nextKeyValue();

}

}

参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java

main函数比较简单,这里也贴出来下,方便后续自己记忆:

[java] view
plaincopy





package com.hadoop.combineInput;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class MergeFiles extends Configured implements Tool {

public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {

public void map(BytesWritable key, Text value, Context context)

throws IOException, InterruptedException {

context.write(key, value);

}

} // END: MapClass

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set("mapred.max.split.size", "157286400");

conf.setBoolean("mapred.output.compress", true);

Job job = new Job(conf);

job.setJobName("MergeFiles");

job.setJarByClass(MergeFiles.class);

job.setMapperClass(MapClass.class);

job.setInputFormatClass(CombineSequenceFileInputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

job.setOutputKeyClass(BytesWritable.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPaths(job, args[0]);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setNumReduceTasks(0);

return job.waitForCompletion(true) ? 0 : 1;

} // END: run

public static void main(String[] args) throws Exception {

int ret = ToolRunner.run(new MergeFiles(), args);

System.exit(ret);

} // END: main

} //

[java] view
plaincopy

性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。

存在问题:

合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: