HFileInputFormat实现
2012-07-27 16:48
399 查看
hbase的底层存储采用的是hfile文件格式,可以作为mr的输入,进行hfile的mr。代码如下:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * This is direct port (hopefully) of the Scala version of this class available * on https://gist.github.com/1120311 * * @author yuankang */ public class HFileInputFormat extends FileInputFormat<ImmutableBytesWritable, KeyValue> { private class HFileRecordReader extends RecordReader<ImmutableBytesWritable, KeyValue> { private HFile.Reader reader; private final HFileScanner scanner; private int entryNumber = 0; public HFileRecordReader(FileSplit split, Configuration conf) throws IOException { SchemaMetrics.configureGlobally(conf); final Path path = split.getPath(); reader = HFile.createReader(FileSystem.get(conf), path, new CacheConfig(conf)); scanner = reader.getScanner(false, false); reader.loadFileInfo(); // This is required or else seekTo throws a // NPE scanner.seekTo(); // This is required or else scanner.next throws an // error } @Override public void close() throws IOException { if (reader != null) { reader.close(); } } /* * @Override public boolean next(ImmutableBytesWritable key, KeyValue * value) throws IOException { entryNumber++; return scanner.next(); } */ @Override public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return new ImmutableBytesWritable(scanner.getKeyValue().getRow()); } @Override public KeyValue getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return scanner.getKeyValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub entryNumber++; return scanner.next(); } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub if (reader != null) { return (entryNumber / reader.getEntries()); } return 1; } @Override public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { } } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new HFileRecordReader((FileSplit) split, context.getConfiguration()); } }
相关文章推荐
- Hadoop自定义InputFormat以实现多文件输入 MultiFileInputFormat
- 简单 实现CombineFileInputFormat
- MapReduce小文件处理之CombineFileInputFormat实现
- 简单实现CombineFileInputFormat
- Hadoop使用CombineFileInputFormat处理大量小文件接口实现(Hadoop-1.0.4)
- Hadoop SequenceFile FileInputFormat实现
- streaming方式的CombineFileInputFormat实现
- MapReduce小文件处理之CombineFileInputFormat实现
- Hadoop CombineFileInputFormat原理说明(转)
- 自定义实现InputFormat、OutputFormat、输出到多个文件目录中去、hadoop1.x api写单词计数的例子、运行时接收命令行参数,代码例子
- Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)
- Android学习之实现WebView中input="file"选择文件,处理选择图片无法返回类型问题
- input type=file 实现上传、预览、删除等功能
- hadoop用MultipleInputs/MultiInputFormat实现一个mapreduce job中读取不同格式的文件
- hadoop实现join (CompositeInputFormat)
- input file实现多次上传文件(不会覆盖上次上传的文件)
- 模拟input file 实现自定义边框
- Error:The method setInputPaths(JobConf, String) in the type FileInputFormat is not
- input type=file 实现上传、预览、删除等功能
- 一段上传图片预览JS脚本,Input file图片预览的实现