Hadoop对小文件的解决方案
2015-06-07 22:07
447 查看
小文件指的是那些size比HDFS的block size(默认64M)小的多的文件。任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中, 每一个object占用150 bytes的内存空间。所以,如果有10million个文件, 每一个文件对应一个block,那么就将要消耗namenode 3G的内存来保存这些block的信息。如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限。
控制小文件的方法有:
1、应用程序自己控制
2、archive
3、Sequence File / Map File
4、CombineFileInputFormat***
5、合并小文件,如HBase部分的compact
/article/1324859.html
通常对于”the small files problem”的回应会是:使用SequenceFile。
这种方法是说,使用filename作为key,并且file contents作为value。实践中这种方式非常管用。
如果有10000个100KB的文件,可以写一个程序来将这些小文件写入到一个单独的 SequenceFile中去,然后就可以在一个streaming fashion(directly or using mapreduce)中来使用这个sequenceFile。不仅如此,SequenceFiles也是splittable的,所以mapreduce 可以break them into chunks,并且分别的被独立的处理。和HAR不同的是,这种方式还支持压缩。 block的压缩在许多情况下都是最好的选择,因为它将多个 records压缩到一起,而不是一个record一个压缩。
在存储结构上, SequenceFile主要由一个Header后跟多条Record组成。
Header主要包含了Key classname, Value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。
每条Record以键值对的方式进行存储,用来表示它的字符数组可依次解析成:记录的长度、 Key的长度、 Key值和Value值,并且Value值的结构取决于该记录是否被压缩。
数据压缩有利于节省磁盘空间和加快网络传输, SeqeunceFile支持两种格式的数据压缩,分别是: record compression和block compression。
record compression是对每条记录的value进行压缩
block compression是将一连串的record组织到一起,统一压缩成一个block。
block信息主要存储了:块所包含的记录数、每条记录Key长度的集合、每条记录Key值的集合、每条记录Value长度的集合和每条记录Value值的集合
注:每个block的大小是可通过
具体可参考:
/article/1324852.html
MapFile
MapFile是排序后的SequenceFile,通过观察其目录结构可以看到
MapFile由两部分组成,分别是data和index。
index作为文件的数据索引,主要记录了每个Record的key值,以及
该Record在文件中的偏移位置。
在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言, MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。
需注意的是, MapFile并不会把所有Record都记录到index中去,默认情况下每隔128条记录存储一个索引映射。当然,记录间隔可人为修改,通过MapFIle.Writer的setIndexInterval()方法,或修改
另外,与SequenceFile不同的是, MapFile的KeyClass一定要实现
WritableComparable接口 ,即Key值是可比较的。
CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
**注:**CombineFileInputFormat是一个抽象类,需要编写一个继承类。
使用CombineFileInputFormat作为Map任务的输入规格描述,首先需要实现一个自定义的RecordReader。
CombineFileInputFormat的大致原理
它会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,因为小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包含了一组文件Block,包括每个文件的起始偏移(offset),长度(length),Block位置(localtions)等元数据。
如果想要处理一个 CombineFileSplit,很容易想到,对其包含的每个InputSplit(实际上这里面没有这个,你需要读取一个小文件块的时候,需要构造一 个FileInputSplit对象)。
在执行MapReduce任务的时候,需要读取文件的文本行(简单一点是文本行,也可能是其他格式数据)。
那么对于CombineFileSplit来说,你需要处理其包含的小文件Block,就要对应设置一个RecordReader,才能正确读取文件数据内容。
通常情况下,我们有一批小文件,格式通常是相同的,只需要在CombineFileSplit实现一个RecordReader的时候,
内置另一个用来读取小文件Block的RecordReader,这样就能保证读取CombineFileSplit内部聚积的小文件。
我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,需要做的工作,如下所示:
1、实现一个RecordReader来读取CombineFileSplit包装的文件Block
2、继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类。
3、处理数据的Mapper实现类
4、配置用来处理海量小文件的MapReduce Job
控制小文件的方法有:
1、应用程序自己控制
2、archive
3、Sequence File / Map File
4、CombineFileInputFormat***
5、合并小文件,如HBase部分的compact
1、应用程序自己控制
final Path path = new Path("/combinedfile"); final FSDataOutputStream create = fs.create(path); final File dir = new File("C:\\Windows\\System32\\drivers\\etc"); for(File fileName : dir.listFiles()) { System.out.println(fileName.getAbsolutePath()); final FileInputStream fileInputStream = new FileInputStream(fileName.getAbsolutePath()); final List<String> readLines = IOUtils.readLines(fileInputStream); for (String line : readLines) { create.write(line.getBytes()); } fileInputStream.close(); } create.close();
2、archive 命令行操作
具体参考如下:/article/1324859.html
3、Sequence File/Map File
Sequence File通常对于”the small files problem”的回应会是:使用SequenceFile。
这种方法是说,使用filename作为key,并且file contents作为value。实践中这种方式非常管用。
如果有10000个100KB的文件,可以写一个程序来将这些小文件写入到一个单独的 SequenceFile中去,然后就可以在一个streaming fashion(directly or using mapreduce)中来使用这个sequenceFile。不仅如此,SequenceFiles也是splittable的,所以mapreduce 可以break them into chunks,并且分别的被独立的处理。和HAR不同的是,这种方式还支持压缩。 block的压缩在许多情况下都是最好的选择,因为它将多个 records压缩到一起,而不是一个record一个压缩。
在存储结构上, SequenceFile主要由一个Header后跟多条Record组成。
Header主要包含了Key classname, Value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。
每条Record以键值对的方式进行存储,用来表示它的字符数组可依次解析成:记录的长度、 Key的长度、 Key值和Value值,并且Value值的结构取决于该记录是否被压缩。
数据压缩有利于节省磁盘空间和加快网络传输, SeqeunceFile支持两种格式的数据压缩,分别是: record compression和block compression。
record compression是对每条记录的value进行压缩
block compression是将一连串的record组织到一起,统一压缩成一个block。
block信息主要存储了:块所包含的记录数、每条记录Key长度的集合、每条记录Key值的集合、每条记录Value长度的集合和每条记录Value值的集合
注:每个block的大小是可通过
io.seqfile.compress.blocksize属性来指定的。
Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path seqFile=new Path("seqFile.seq"); //Reader内部类用于文件的读取操作 SequenceFile.Reader reader=new SequenceFile.Reader(fs,seqFile,conf); //Writer内部类用于文件的写操作,假设Key和Value都为Text类型 SequenceFile.Writer writer=new SequenceFile.Writer(fs,conf,seqFile,Text.class,Text.class); //通过writer向文档中写入记录 writer.append(new Text("key"),new Text("value")); IOUtils.closeStream(writer);//关闭write流 //通过reader从文档中读取记录 Text key=new Text(); Text value=new Text(); while(reader.next(key,value)) { System.out.println(key); System.out.println(value); } IOUtils.closeStream(reader);//关闭read流
具体可参考:
/article/1324852.html
MapFile
MapFile是排序后的SequenceFile,通过观察其目录结构可以看到
MapFile由两部分组成,分别是data和index。
index作为文件的数据索引,主要记录了每个Record的key值,以及
该Record在文件中的偏移位置。
在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言, MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。
需注意的是, MapFile并不会把所有Record都记录到index中去,默认情况下每隔128条记录存储一个索引映射。当然,记录间隔可人为修改,通过MapFIle.Writer的setIndexInterval()方法,或修改
io.map.index.interval属性;
另外,与SequenceFile不同的是, MapFile的KeyClass一定要实现
WritableComparable接口 ,即Key值是可比较的。
Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path mapFile=new Path("mapFile.map"); //Writer内部类用于文件的写操作,假设Key和Value都为Text类型 MapFile.Writer writer=new MapFile.Writer(conf,fs,mapFile.toString(),Text.class,Text.class); //通过writer向文档中写入记录 writer.append(new Text("key"),new Text("value")); IOUtils.closeStream(writer);//关闭write流 //Reader内部类用于文件的读取操作 MapFile.Reader reader=new MapFile.Reader(fs,mapFile.toString(),conf); //通过reader从文档中读取记录 Text key=new Text(); Text value=new Text(); while(reader.next(key,value)) { System.out.println(key); System.out.println(value); } IOUtils.closeStream(reader);//关闭read流
5、CombineFileInputFormat
相对于大量的小文件来说,hadoop更合适处理少量的大文件。CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
**注:**CombineFileInputFormat是一个抽象类,需要编写一个继承类。
使用CombineFileInputFormat作为Map任务的输入规格描述,首先需要实现一个自定义的RecordReader。
CombineFileInputFormat的大致原理
它会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,因为小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包含了一组文件Block,包括每个文件的起始偏移(offset),长度(length),Block位置(localtions)等元数据。
如果想要处理一个 CombineFileSplit,很容易想到,对其包含的每个InputSplit(实际上这里面没有这个,你需要读取一个小文件块的时候,需要构造一 个FileInputSplit对象)。
在执行MapReduce任务的时候,需要读取文件的文本行(简单一点是文本行,也可能是其他格式数据)。
那么对于CombineFileSplit来说,你需要处理其包含的小文件Block,就要对应设置一个RecordReader,才能正确读取文件数据内容。
通常情况下,我们有一批小文件,格式通常是相同的,只需要在CombineFileSplit实现一个RecordReader的时候,
内置另一个用来读取小文件Block的RecordReader,这样就能保证读取CombineFileSplit内部聚积的小文件。
我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,需要做的工作,如下所示:
1、实现一个RecordReader来读取CombineFileSplit包装的文件Block
2、继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类。
3、处理数据的Mapper实现类
4、配置用来处理海量小文件的MapReduce Job
package SmallFile; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable,BytesWritable> { @Override public RecordReader<LongWritable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineFileSplit = (CombineFileSplit)(split); CombineFileRecordReader<LongWritable,BytesWritable> recordReader = new CombineFileRecordReader<LongWritable,BytesWritable> (combineFileSplit, context,CombineSmallfileRecordReader.class); try { recordReader.initialize(combineFileSplit, context); } catch (InterruptedException e) { e.printStackTrace(); } return recordReader; } } class CombineSmallfileRecordReader extends RecordReader<LongWritable,BytesWritable> { private CombineFileSplit combineFileSplit; private LineRecordReader lineRecordReader = new LineRecordReader(); private Path[] paths; private int totalLength; private int currentIndex; private float currentProgress = 0; private LongWritable currentKey; private BytesWritable currentValue; public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit,TaskAttemptContext context,Integer index) { super(); this.combineFileSplit = combineFileSplit; this.currentIndex = index; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex),combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations()); lineRecordReader.initialize(fileSplit, context); this.paths = combineFileSplit.getPaths(); //分区所在的所有地址 context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName()); //设置输入文件名 } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(currentIndex>=0 && currentIndex<totalLength) { return lineRecordReader.nextKeyValue(); } return false; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { currentKey = lineRecordReader.getCurrentKey(); return currentKey; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { byte[]value = lineRecordReader.getCurrentValue().getBytes(); currentValue.set(value, 0, value.length); return currentValue; } @Override public float getProgress() throws IOException, InterruptedException { if(currentIndex>=0 && currentIndex<totalLength) { currentProgress = currentIndex/totalLength; return currentProgress; } return currentProgress; } @Override public void close() throws IOException { lineRecordReader.close(); } }
相关文章推荐
- Linux下安装zookeeper
- 【转】Twitter Storm: 在生产集群上运行topology
- tomcat SSL配置
- wordpress 迁移网站更改域名解决图片无法显示
- Storm监控文件夹变化 统计文件单词数量
- 基于heartbeat v1+ldirectord实现LVS集群高可用 推荐
- CentOS7 + mysql-cluster-7.4.6 配置部署
- Hadoop学习笔记(十五)---Hbase shell命令的使用
- linux内存管理之DMA
- 【原创】记一次线上Tomcat故障排查-struts2 “bug”导致tomcat阻塞
- linux系统Samba服务器的搭建与配置
- 初学Linux 命令
- 初识Linux-C/C++开发环境
- win8下安装kali Linux
- 使用 Apache POI读取EXCEL文件
- 更新CentOS 5.5 yum源
- wordpress 迁移网站更改域名解决图片无法显示
- Linux系统DNS服务器的搭建与配置
- 使用Animate.css和wow.js,实现各大网站常用的页面加载动画
- 几种开源SIP协议栈对比OPAL,VOCAL,sipX,ReSIProcate,oSIP