hdfs小文件问题
关于Flume采集数据在HDFS上产生大量小文件问题
修改Flume中的sink配置
hdfs.rollSize的数据增大或者修改为0
hdfs.rollCount改成0(不根据 事件滚动)
这样输出的文件就是一整个
关于HDFS小文件及解决方案:
小文件是指size小于HDFS上block大小的文件,这样的文件会给Hadoop的拓展性和性能带来严重的问题,由于NameNode将文件系统的元数据存储在内存中,因此该文件系统所能存储的文件总数受限于NameNode的内存容量,通常每个文件,目录和数据块的存储信息大约占150个字节,
如果有一千万个小文件,则NameNode大约需要2g空间,如果存储一亿个小文件,则NameNode需要20g空间,这样NameNode的内存容量严重制约了集群的扩展,其次访问大量小文件速度远远小于访问几个大文件
解决方法:
- Hadoop Archive 存档
- Sequence file 将大批小文件合并成一个大文件
- 编程实现:
我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,需要做的工作就很显然了,如下所示:
实现一个RecordReader来读取CombineFileSplit包装的文件Block
继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类处理数据的Mapper实现类配置用来处理海量小文件的MapReduce Job
下面,对编程实现的过程,详细讲解:
- CombineSmallfileRecordReader类
为CombineFileSplit实现一个RecordReader,并在内部使用Hadoop自带的LineRecordReader来读取小文件的文本行数据,代码实现如下所示:
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {
private CombineFileSplit combineFileSplit;
private LineRecordReader lineRecordReader = new LineRecordReader();
private Path[] paths;
private int totalLength;
private int currentIndex;
private float currentProgress = 0;
private LongWritable currentKey;
private BytesWritable currentValue = new BytesWritable();;
public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {
super();
this.combineFileSplit = combineFileSplit;
this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.combineFileSplit = (CombineFileSplit) split;
// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据
FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
lineRecordReader.initialize(fileSplit, context);
this.paths = combineFileSplit.getPaths();
totalLength = paths.length;
context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
currentKey = lineRecordReader.getCurrentKey();
return currentKey;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
byte[] content = lineRecordReader.getCurrentValue().getBytes();
currentValue.set(content, 0, content.length);
return currentValue;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentIndex >= 0 && currentIndex < totalLength) {
return lineRecordReader.nextKeyValue();
} else {
return false;
}
}
@Override
public float getProgress() throws IOException {
if (currentIndex >= 0 && currentIndex < totalLength) {
currentProgress = (float) currentIndex / totalLength;
return currentProgress;
}
return currentProgress;
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
如果存在这样的应用场景,你的小文件具有不同的格式,那么久需要考虑对不同类型的小文件,使用不同的内置RecordReader,具体逻辑也是在上面的类中实现。
- CombineSmallfileInputFormat类
我们已经为CombineFileSplit实现了一个RecordReader,然后需要在一个CombineFileInputFormat中注入这个RecordReader类实现类CombineSmallfileRecordReader的对象。这时,需要实现一个CombineFileInputFormat的子类,可以重写createRecordReader方法。我们实现的CombineSmallfileInputFormat,代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> {
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
CombineFileSplit combineFileSplit = (CombineFileSplit) split;
CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
try {
recordReader.initialize(combineFileSplit, context);
} catch (InterruptedException e) {
new RuntimeException("Error to initialize CombineSmallfileRecordReader.");
}
return recordReader;
}
}
上面比较重要的是,一定要通过CombineFileRecordReader来创建一个RecordReader,而且它的构造方法的参数必须是上面的定义的类型和顺序,构造方法包含3个参数:第一个是CombineFileSplit类型,第二个是TaskAttemptContext类型,第三个是Class<? extends RecordReader>类型。
- CombineSmallfileMapper类
下面,我们实现我们的MapReduce任务实现类,CombineSmallfileMapper类代码,如下所示:
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CombineSmallfileMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {
private Text file = new Text();
@Override
protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
String fileName = context.getConfiguration().get("map.input.file.name");
file.set(fileName);
context.write(file, value);
}
}
比较简单,就是将输入的文件文本行拆分成键值对,然后输出。
- CombineSmallfiles类
下面看我们的主方法入口类,这里面需要配置我之前实现的MapReduce Job,实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.shirdrn.kodz.inaction.hadoop.smallfiles.IdentityReducer;
public class CombineSmallfiles {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: conbinesmallfiles <in> <out>");
System.exit(2);
}
conf.setInt("mapred.min.split.size", 1);
conf.setLong("mapred.max.split.size", 26214400); // 25m
conf.setInt("mapred.reduce.tasks", 5);
Job job = new Job(conf, "combine smallfiles");
job.setJarByClass(CombineSmallfiles.class);
job.setMapperClass(CombineSmallfileMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class 20c58 );
job.setInputFormatClass(CombineSmallfileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
int exitFlag = job.waitForCompletion(true) ? 0 : 1;
System.exit(exitFlag);
}
}
运行程序
下面看一下,我们经过处理后,将小文件合并的结果,从而更利于使用Hadoop MapReduce框架进行高效地计算。
- 准备工作
jar -cvf combine-smallfiles.jar -C ./ org/shirdrn/kodz/inaction/hadoop/smallfiles
xiaoxiang@ubuntu3:~$ cd /opt/comodo/cloud/hadoop-1.0.3
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -mkdir /user/xiaoxiang/datasets/smallfiles
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/comodo/cloud/dataset/smallfiles/* /user/xiaoxiang/datasets/smallfiles
- 运行结果
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop jar combine-smallfiles.jar org.shirdrn.kodz.inaction.hadoop.smallfiles.combine.CombineSmallfiles /user/xiaoxiang/datasets/smallfiles /user/xiaoxiang/output/smallfiles/combine
13/03/23 21:52:09 INFO input.FileInputFormat: Total input paths to process : 117
13/03/23 21:52:09 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/03/23 21:52:09 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/23 21:52:10 INFO mapred.JobClient: Running job: job_201303111631_0038
13/03/23 21:52:11 INFO mapred.JobClient: map 0% reduce 0%
13/03/23 21:52:29 INFO mapred.JobClient: map 33% reduce 0%
13/03/23 21:52:32 INFO mapred.JobClient: map 55% reduce 0%
13/03/23 21:52:35 INFO mapred.JobClient: map 76% reduce 0%
13/03/23 21:52:38 INFO mapred.JobClient: map 99% reduce 0%
13/03/23 21:52:41 INFO mapred.JobClient: map 100% reduce 0%
13/03/23 21:53:02 INFO mapred.JobClient: map 100% reduce 20%
13/03/23 21:53:05 INFO mapred.JobClient: map 100% reduce 40%
13/03/23 21:53:14 INFO mapred.JobClient: map 100% reduce 60%
13/03/23 21:53:17 INFO mapred.JobClient: map 100% reduce 80%
13/03/23 21:53:32 INFO mapred.JobClient: map 100% reduce 100%
13/03/23 21:53:37 INFO mapred.JobClient: Job complete: job_201303111631_0038
13/03/23 21:53:37 INFO mapred.JobClient: Counters: 28
13/03/23 21:53:37 INFO mapred.JobClient: Job Counters
13/03/23 21:53:37 INFO mapred.JobClient: Launched reduce tasks=5
13/03/23 21:53:37 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=33515
13/03/23 21:53:37 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/23 21:53:37 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/03/23 21:53:37 INFO mapred.JobClient: Launched map tasks=1
13/03/23 21:53:37 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=69085
13/03/23 21:53:37 INFO mapred.JobClient: File Output Format Counters
13/03/23 21:53:37 INFO mapred.JobClient: Bytes Written=237510415
13/03/23 21:53:37 INFO mapred.JobClient: FileSystemCounters
13/03/23 21:53:37 INFO mapred.JobClient: FILE_BYTES_READ=508266867
13/03/23 21:53:37 INFO mapred.JobClient: HDFS_BYTES_READ=147037765
13/03/23 21:53:37 INFO mapred.JobClient: FILE_BYTES_WRITTEN=722417364
13/03/23 21:53:37 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=237510415
13/03/23 21:53:37 INFO mapred.JobClient: File Input Format Counters
13/03/23 21:53:37 INFO mapred.JobClient: Bytes Read=0
13/03/23 21:53:37 INFO mapred.JobClient: Map-Reduce Framework
13/03/23 21:53:37 INFO mapred.JobClient: Map output materialized bytes=214110010
13/03/23 21:53:37 INFO mapred.JobClient: Map input records=3510000
13/03/23 21:53:37 INFO mapred.JobClient: Reduce shuffle bytes=0
13/03/23 21:53:37 INFO mapred.JobClient: Spilled Records=11840717
13/03/23 21:53:37 INFO mapred.JobClient: Map output bytes=207089980
13/03/23 21:53:37 INFO mapred.JobClient: CPU time spent (ms)=64200
13/03/23 21:53:37 INFO mapred.JobClient: Total committed heap usage (bytes)=722665472
13/03/23 21:53:37 INFO mapred.JobClient: Combine input records=0
13/03/23 21:53:37 INFO mapred.JobClient: SPLIT_RAW_BYTES=7914
13/03/23 21:53:37 INFO mapred.JobClient: Reduce input records=3510000
13/03/23 21:53:37 INFO mapred.JobClient: Reduce input groups=117
13/03/23 21:53:37 INFO mapred.JobClient: Combine output records=0
13/03/23 21:53:37 INFO mapred.JobClient: Physical memory (bytes) snapshot=820969472
13/03/23 21:53:37 INFO mapred.JobClient: Reduce output records=3510000
13/03/23 21:53:37 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3257425920
13/03/23 21:53:37 INFO mapred.JobClient: Map output records=3510000
阅读更多- HDFS小文件问题及解决方案
- HDFS小文件问题解决方案与启发实践
- HDFS Explorer无法上传文件的问题
- 上传到hdfs文件所属问题
- 如何从根源上解决 HDFS 小文件问题
- 问题记录:spark读取hdfs文件出错
- hadoop中mapreduce使用hdfs中的文件问题
- Hadoop2.x:HDFS存储小文件问题及其解决方法
- HDFS小文件问题及解决方案
- FTP压缩文件上传到HDFS大小不一致的问题说明(FTP传输模式)
- hdfs上传文件大小为零问题解决
- HDFS文件合并问题
- HDFS小文件问题解决方案
- flume.hadoop.hdfs.频繁滚动小文件问题
- [zz]HDFS小文件问题及解决方案
- 解决从本地文件系统上传到HDFS时的权限问题
- hdfs客户端上传文件追加出现的问题:
- HDFS处理大量小文件时的问题
- HDFS小文件合并问题的优化:copyMerge的改进
- hbase修复.META.表与HDFS文件不一致问题