您的位置:首页 > 大数据 > Hadoop

hdfs小文件问题

2018-10-23 20:15 676 查看

关于Flume采集数据在HDFS上产生大量小文件问题

修改Flume中的sink配置

hdfs.rollSize的数据增大或者修改为0

hdfs.rollCount改成0(不根据 事件滚动)

这样输出的文件就是一整个

 

关于HDFS小文件及解决方案:

小文件是指size小于HDFS上block大小的文件,这样的文件会给Hadoop的拓展性和性能带来严重的问题,由于NameNode将文件系统的元数据存储在内存中,因此该文件系统所能存储的文件总数受限于NameNode的内存容量,通常每个文件,目录和数据块的存储信息大约占150个字节,

如果有一千万个小文件,则NameNode大约需要2g空间,如果存储一亿个小文件,则NameNode需要20g空间,这样NameNode的内存容量严重制约了集群的扩展,其次访问大量小文件速度远远小于访问几个大文件

解决方法:

  1. Hadoop Archive 存档
  2. Sequence file 将大批小文件合并成一个大文件
  3. 编程实现:

我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,需要做的工作就很显然了,如下所示:

实现一个RecordReader来读取CombineFileSplit包装的文件Block

继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类处理数据的Mapper实现类配置用来处理海量小文件的MapReduce Job

下面,对编程实现的过程,详细讲解:

  • CombineSmallfileRecordReader类

为CombineFileSplit实现一个RecordReader,并在内部使用Hadoop自带的LineRecordReader来读取小文件的文本行数据,代码实现如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {

private CombineFileSplit combineFileSplit;

private LineRecordReader lineRecordReader = new LineRecordReader();

private Path[] paths;

private int totalLength;

private int currentIndex;

private float currentProgress = 0;

private LongWritable currentKey;

private BytesWritable currentValue = new BytesWritable();;

public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {

super();

this.combineFileSplit = combineFileSplit;

this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引

}

@Override

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

this.combineFileSplit = (CombineFileSplit) split;

// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据

FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());

lineRecordReader.initialize(fileSplit, context);

this.paths = combineFileSplit.getPaths();

totalLength = paths.length;

context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());

}

@Override

public LongWritable getCurrentKey() throws IOException, InterruptedException {

currentKey = lineRecordReader.getCurrentKey();

return currentKey;

}

@Override

public BytesWritable getCurrentValue() throws IOException, InterruptedException {

byte[] content = lineRecordReader.getCurrentValue().getBytes();

currentValue.set(content, 0, content.length);

return currentValue;

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

if (currentIndex >= 0 && currentIndex < totalLength) {

return lineRecordReader.nextKeyValue();

} else {

return false;

}

}

@Override

public float getProgress() throws IOException {

if (currentIndex >= 0 && currentIndex < totalLength) {

currentProgress = (float) currentIndex / totalLength;

return currentProgress;

}

return currentProgress;

}

@Override

public void close() throws IOException {

lineRecordReader.close();

}

}

如果存在这样的应用场景,你的小文件具有不同的格式,那么久需要考虑对不同类型的小文件,使用不同的内置RecordReader,具体逻辑也是在上面的类中实现。

  • CombineSmallfileInputFormat类

我们已经为CombineFileSplit实现了一个RecordReader,然后需要在一个CombineFileInputFormat中注入这个RecordReader类实现类CombineSmallfileRecordReader的对象。这时,需要实现一个CombineFileInputFormat的子类,可以重写createRecordReader方法。我们实现的CombineSmallfileInputFormat,代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;

import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> {

@Override

public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

CombineFileSplit combineFileSplit = (CombineFileSplit) split;

CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);

try {

recordReader.initialize(combineFileSplit, context);

} catch (InterruptedException e) {

new RuntimeException("Error to initialize CombineSmallfileRecordReader.");

}

return recordReader;

}

 

}

上面比较重要的是,一定要通过CombineFileRecordReader来创建一个RecordReader,而且它的构造方法的参数必须是上面的定义的类型和顺序,构造方法包含3个参数:第一个是CombineFileSplit类型,第二个是TaskAttemptContext类型,第三个是Class<? extends RecordReader>类型。

  • CombineSmallfileMapper类

下面,我们实现我们的MapReduce任务实现类,CombineSmallfileMapper类代码,如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class CombineSmallfileMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {

private Text file = new Text();

@Override

protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {

String fileName = context.getConfiguration().get("map.input.file.name");

file.set(fileName);

context.write(file, value);

}

}

比较简单,就是将输入的文件文本行拆分成键值对,然后输出。

  • CombineSmallfiles类

下面看我们的主方法入口类,这里面需要配置我之前实现的MapReduce Job,实现代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.shirdrn.kodz.inaction.hadoop.smallfiles.IdentityReducer;

public class CombineSmallfiles {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: conbinesmallfiles <in> <out>");

System.exit(2);

}

conf.setInt("mapred.min.split.size", 1);

conf.setLong("mapred.max.split.size", 26214400); // 25m

conf.setInt("mapred.reduce.tasks", 5);

Job job = new Job(conf, "combine smallfiles");

job.setJarByClass(CombineSmallfiles.class);

job.setMapperClass(CombineSmallfileMapper.class);

job.setReducerClass(IdentityReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(BytesWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(BytesWritable.class 20c58 );

job.setInputFormatClass(CombineSmallfileInputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

int exitFlag = job.waitForCompletion(true) ? 0 : 1;

System.exit(exitFlag);

}

}

运行程序

下面看一下,我们经过处理后,将小文件合并的结果,从而更利于使用Hadoop MapReduce框架进行高效地计算。

  • 准备工作

jar -cvf combine-smallfiles.jar -C ./ org/shirdrn/kodz/inaction/hadoop/smallfiles

xiaoxiang@ubuntu3:~$ cd /opt/comodo/cloud/hadoop-1.0.3

xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -mkdir /user/xiaoxiang/datasets/smallfiles

xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/comodo/cloud/dataset/smallfiles/* /user/xiaoxiang/datasets/smallfiles

  • 运行结果

xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop jar combine-smallfiles.jar org.shirdrn.kodz.inaction.hadoop.smallfiles.combine.CombineSmallfiles /user/xiaoxiang/datasets/smallfiles /user/xiaoxiang/output/smallfiles/combine

13/03/23 21:52:09 INFO input.FileInputFormat: Total input paths to process : 117

13/03/23 21:52:09 INFO util.NativeCodeLoader: Loaded the native-hadoop library

13/03/23 21:52:09 WARN snappy.LoadSnappy: Snappy native library not loaded

13/03/23 21:52:10 INFO mapred.JobClient: Running job: job_201303111631_0038

13/03/23 21:52:11 INFO mapred.JobClient: map 0% reduce 0%

13/03/23 21:52:29 INFO mapred.JobClient: map 33% reduce 0%

13/03/23 21:52:32 INFO mapred.JobClient: map 55% reduce 0%

13/03/23 21:52:35 INFO mapred.JobClient: map 76% reduce 0%

13/03/23 21:52:38 INFO mapred.JobClient: map 99% reduce 0%

13/03/23 21:52:41 INFO mapred.JobClient: map 100% reduce 0%

13/03/23 21:53:02 INFO mapred.JobClient: map 100% reduce 20%

13/03/23 21:53:05 INFO mapred.JobClient: map 100% reduce 40%

13/03/23 21:53:14 INFO mapred.JobClient: map 100% reduce 60%

13/03/23 21:53:17 INFO mapred.JobClient: map 100% reduce 80%

13/03/23 21:53:32 INFO mapred.JobClient: map 100% reduce 100%

13/03/23 21:53:37 INFO mapred.JobClient: Job complete: job_201303111631_0038

13/03/23 21:53:37 INFO mapred.JobClient: Counters: 28

13/03/23 21:53:37 INFO mapred.JobClient: Job Counters

13/03/23 21:53:37 INFO mapred.JobClient: Launched reduce tasks=5

13/03/23 21:53:37 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=33515

13/03/23 21:53:37 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0

13/03/23 21:53:37 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0

13/03/23 21:53:37 INFO mapred.JobClient: Launched map tasks=1

13/03/23 21:53:37 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=69085

13/03/23 21:53:37 INFO mapred.JobClient: File Output Format Counters

13/03/23 21:53:37 INFO mapred.JobClient: Bytes Written=237510415

13/03/23 21:53:37 INFO mapred.JobClient: FileSystemCounters

13/03/23 21:53:37 INFO mapred.JobClient: FILE_BYTES_READ=508266867

13/03/23 21:53:37 INFO mapred.JobClient: HDFS_BYTES_READ=147037765

13/03/23 21:53:37 INFO mapred.JobClient: FILE_BYTES_WRITTEN=722417364

13/03/23 21:53:37 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=237510415

13/03/23 21:53:37 INFO mapred.JobClient: File Input Format Counters

13/03/23 21:53:37 INFO mapred.JobClient: Bytes Read=0

13/03/23 21:53:37 INFO mapred.JobClient: Map-Reduce Framework

13/03/23 21:53:37 INFO mapred.JobClient: Map output materialized bytes=214110010

13/03/23 21:53:37 INFO mapred.JobClient: Map input records=3510000

13/03/23 21:53:37 INFO mapred.JobClient: Reduce shuffle bytes=0

13/03/23 21:53:37 INFO mapred.JobClient: Spilled Records=11840717

13/03/23 21:53:37 INFO mapred.JobClient: Map output bytes=207089980

13/03/23 21:53:37 INFO mapred.JobClient: CPU time spent (ms)=64200

13/03/23 21:53:37 INFO mapred.JobClient: Total committed heap usage (bytes)=722665472

13/03/23 21:53:37 INFO mapred.JobClient: Combine input records=0

13/03/23 21:53:37 INFO mapred.JobClient: SPLIT_RAW_BYTES=7914

13/03/23 21:53:37 INFO mapred.JobClient: Reduce input records=3510000

13/03/23 21:53:37 INFO mapred.JobClient: Reduce input groups=117

13/03/23 21:53:37 INFO mapred.JobClient: Combine output records=0

13/03/23 21:53:37 INFO mapred.JobClient: Physical memory (bytes) snapshot=820969472

13/03/23 21:53:37 INFO mapred.JobClient: Reduce output records=3510000

13/03/23 21:53:37 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3257425920

13/03/23 21:53:37 INFO mapred.JobClient: Map output records=3510000

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: