streaming方式的CombineFileInputFormat实现
2012-09-24 14:17
260 查看
hadoop版本问题严重,0.21的streaming方式无法正确使用combinefileinputformat,修改部分源码,以及实现CombineFileLineRecordReader。
源码修改部分:hadoop-mapred-0.21.0.jar包里的org.apache.hadoop.mapred.lib.CombineFileInputFormat.java文件
streaming方式要求任务分片为org.apache.hadoop.mapred.InputSplit,而实际输入分片为org.apache.hadoop.mapreduce.lib.input.CombineFileSplit,所以需要转变分片类型。
*实现读取streaming方式数据的combineFileRecordReader,key value均为Text(可以根据实际需求更改)
**/
源码修改部分:hadoop-mapred-0.21.0.jar包里的org.apache.hadoop.mapred.lib.CombineFileInputFormat.java文件
streaming方式要求任务分片为org.apache.hadoop.mapred.InputSplit,而实际输入分片为org.apache.hadoop.mapreduce.lib.input.CombineFileSplit,所以需要转变分片类型。
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { List<org.apache.hadoop.mapreduce.InputSplit> splits = super.getSplits(new Job(job)); int size = splits.size(); if (splits.get(0) instanceof org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) { InputSplit[] returnSplits = new InputSplit[size]; for(int i=0;i<size; i++) { org.apache.hadoop.mapreduce.lib.input.CombineFileSplit combineFileSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit)splits.get(i); Path[] paths = combineFileSplit.getPaths(); long[] starts = combineFileSplit.getStartOffsets(); long[] lengths = combineFileSplit.getLengths(); String[] locations = combineFileSplit.getLocations(); returnSplits[i] = new CombineFileSplit(job,paths,starts,lengths,locations); } return returnSplits; } else { return splits.toArray(new InputSplit[0]); } }/**
*实现读取streaming方式数据的combineFileRecordReader,key value均为Text(可以根据实际需求更改)
**/
package test.com.cn; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.util.LineReader; @SuppressWarnings("deprecation") public class CombineFileLineRecordReader implements RecordReader<Text, Text> { private CombineFileSplit split; private int currentFileIndex; //当前处理split中的哪一个文件 private long currentOffset; //在当前文件中文件指针的偏移量 private long readByte; //已经读取的文件字节总数 private long totalBytes; //split的总文件字节数 private int totalFiles; //split中一共包含的文件数 private FileSystem fs; private Path[] paths; //split中所有文件Path private long[] lengths; //split中各个文件的字节数 private long endOffset; //当前文件中文件结束的偏移量 private FSDataInputStream fileIn; private LineReader reader; public CombineFileLineRecordReader(CombineFileSplit split, Configuration conf) throws IOException {//和新的api方式有很大不同,新的api对于同一个分片下的不同数据文件,依次生成CombineFileLineRecordReader对象来处理 //而旧api则使用一个reader处理分片类的所有数据,包括不同文件但是被划分到同一分片的数据 this.split = split; fs = FileSystem.get(conf); this.currentFileIndex = 0; this.currentOffset = 0; this.readByte = 0; this.totalBytes = split.getLength(); this.totalFiles = split.getNumPaths(); this.paths = split.getPaths(); this.lengths = split.getLengths(); //初始化split中的第一个文件 Path file = paths[currentFileIndex]; fileIn = fs.open(file); this.currentOffset = this.split.getOffset(currentFileIndex); fileIn.seek(this.split.getOffset(currentFileIndex)); this.endOffset = this.currentOffset + this.lengths[currentFileIndex]; reader = new LineReader(fileIn); } @Override public boolean next(Text key, Text value) throws IOException { System.err.write("next() called.\n".getBytes()); // TODO Auto-generated method stub if (currentFileIndex >= totalFiles) //所有数据读取完毕 return false; if (key == null) { key = new Text();; } if (value == null) { value = new Text(); } System.err.write("starts to read data.\n".getBytes()); if(currentOffset >= endOffset) //当前文件读取完毕 { this.reader.close(); //先关闭上一个文件的reader this.fileIn.close(); currentFileIndex++; //处理下一个文件 if(currentFileIndex >= totalFiles)//所有数据读取完毕 { return false; } //继续处理其他文件 Path file = paths[currentFileIndex]; fileIn = fs.open(file); this.currentOffset = this.split.getOffset(currentFileIndex); fileIn.seek(this.split.getOffset(currentFileIndex)); this.endOffset = this.currentOffset + this.lengths[currentFileIndex]; reader = new LineReader(fileIn); } int newSize = 0; if (currentOffset < endOffset) { newSize = reader.readLine(value); this.readByte += newSize; currentOffset += newSize; } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text createKey(){ // TODO Auto-generated method stub return new Text(); } @Override public Text createValue() { // TODO Auto-generated method stub return new Text(); } @Override public float getProgress() throws IOException { // TODO Auto-generated method stub return (float) (this.readByte*1.0/this.totalBytes); } @Override public long getPos() throws IOException { // TODO Auto-generated method stub return this.readByte; } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; public class MyCombineFileInputFormat extends CombineFileInputFormat<Text, Text> { @Override public RecordReader<Text, Text> getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { // TODO Auto-generated method stub return new CombineFileLineRecordReader((CombineFileSplit) split, job); } }
相关文章推荐
- MapReduce小文件处理之CombineFileInputFormat实现
- 简单 实现CombineFileInputFormat
- Hadoop使用CombineFileInputFormat处理大量小文件接口实现(Hadoop-1.0.4)
- 简单实现CombineFileInputFormat
- MapReduce小文件处理之CombineFileInputFormat实现
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat
- MR案例:CombineFileInputFormat
- MapReduce的CombineFileInputFormat使用
- Hadoop CombineFileInputFormat原理说明(转)
- 使用trigger方式实现不用点击file类型的input弹出文件选择对话框
- MapReduce应用中CombineFileInputFormat原理与用法
- HFileInputFormat实现
- MapReduce应用中CombineFileInputFormat原理与用法
- Hadoop SequenceFile FileInputFormat实现
- CombineFileinputFormat处理大批量小文件
- 用CombineFileInputFormat优化Hadoop小文件
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat(每次往map中读入1行)
- MapReduce的inputformat为CombineFileInputFormat的相关实验
- Hadoop中CombineFileInputFormat详解
- 利用CombineFileInputFormat把netflix data set 导入到Hbase里