您的位置:首页 > 其它

利用CombineFileInputFormat处理小文件

2015-04-22 16:35 399 查看
本文转自我的原创blog: http://www.javali.org/document/deal_small_files_with_combinefileinputformat_in_mapreduce.html

在之前的文章里hadoop处理小文件问题 使用hadoop archive files来解决海量小文件引起的资源及性能问题。该方案需人工进行维护,适用管理人员的操作,而且har文件一旦创建,Archives便不可改变,所以适合一次性写入大量小文件的场景。

hadoop自带的还有另一种解决方案:CombineFileInputFormat

CombineFileInputFormat是一个抽象类,必须要自定义继承它才能使用。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960public class MyCombineInputFormat extends CombineFileInputFormat<Text, Text> { public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineSplit = (CombineFileSplit) split; RecordReader rr = new CombineFileRecordReader(combineSplit, context, myCombineFileRecordReader.class); return rr; } public static class myCombineFileRecordReader extends RecordReader<LongWritable, Text> { private LineRecordReader linerecord; private int index; public myCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { this.index = index; InputSplit is = (InputSplit) split; } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineSplit = (CombineFileSplit) split; linerecord = new LineRecordReader(); FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations()); linerecord.initialize(fileSplit, context); } @Override public void close() throws IOException { if (linerecord != null) { linerecord.close(); linerecord = null; } } @Override public Text getCurrentValue() { return linerecord.getCurrentValue(); } @Override public LongWritable getCurrentKey() { return linerecord.getCurrentKey(); } @Override public float getProgress() throws IOException { return linerecord.getProgress(); } @Override public boolean nextKeyValue() throws IOException { return linerecord.nextKeyValue(); } }}
然后启动Job时就能直接使用MyCombineInputFormat了Java

1

2

3

4

Jobjob=newJob(conf,"Market
Prom Job");

job.setInputFormatClass(MyCombineInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

MultipleInputs.addInputPath(job,newPath(args[0]),MyCombineInputFormat.class,LogMap.class);

另外影响Map数的还有三个参数

1

2

3

4

Configurationconf=newConfiguration();

conf.setLong("mapred.min.split.size.per.node",128*1024*1024);

conf.setLong("mapred.min.split.size.per.rack",128*1024*1024);

conf.setLong("mapred.max.split.size",128*1024*1024);

这样启动能极大的减少map数,计算性能提升的相当明显。

想了解CombineFileInputFormat是如何减少map数的,可以参见这篇博文: 深度分析如何在Hadoop中控制Map的数量

【完】
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: