您的位置:首页 > 其它

streaming方式的CombineFileInputFormat实现

2012-09-24 14:17 260 查看
hadoop版本问题严重,0.21的streaming方式无法正确使用combinefileinputformat,修改部分源码,以及实现CombineFileLineRecordReader。

源码修改部分:hadoop-mapred-0.21.0.jar包里的org.apache.hadoop.mapred.lib.CombineFileInputFormat.java文件

streaming方式要求任务分片为org.apache.hadoop.mapred.InputSplit,而实际输入分片为org.apache.hadoop.mapreduce.lib.input.CombineFileSplit,所以需要转变分片类型。

public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
List<org.apache.hadoop.mapreduce.InputSplit> splits = super.getSplits(new Job(job));
int size = splits.size();

if (splits.get(0) instanceof org.apache.hadoop.mapreduce.lib.input.CombineFileSplit)
{
InputSplit[] returnSplits = new InputSplit[size];
for(int i=0;i<size; i++)
{
org.apache.hadoop.mapreduce.lib.input.CombineFileSplit combineFileSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit)splits.get(i);
Path[] paths = combineFileSplit.getPaths();
long[] starts = combineFileSplit.getStartOffsets();
long[] lengths = combineFileSplit.getLengths();
String[] locations = combineFileSplit.getLocations();

returnSplits[i] = new CombineFileSplit(job,paths,starts,lengths,locations);
}
return returnSplits;
}
else
{
return splits.toArray(new InputSplit[0]);
}
}
/**

*实现读取streaming方式数据的combineFileRecordReader,key value均为Text(可以根据实际需求更改)

**/

package test.com.cn;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.util.LineReader;

@SuppressWarnings("deprecation")
public class CombineFileLineRecordReader implements RecordReader<Text, Text> {

private CombineFileSplit split;

private int currentFileIndex;  //当前处理split中的哪一个文件
private long currentOffset;    //在当前文件中文件指针的偏移量
private long readByte;         //已经读取的文件字节总数
private long totalBytes;       //split的总文件字节数
private int totalFiles;        //split中一共包含的文件数
private FileSystem fs;
private Path[] paths;          //split中所有文件Path
private long[] lengths;        //split中各个文件的字节数
private long endOffset;        //当前文件中文件结束的偏移量

private FSDataInputStream fileIn;
private LineReader reader;

public CombineFileLineRecordReader(CombineFileSplit split, Configuration conf) throws IOException
{//和新的api方式有很大不同,新的api对于同一个分片下的不同数据文件,依次生成CombineFileLineRecordReader对象来处理
//而旧api则使用一个reader处理分片类的所有数据,包括不同文件但是被划分到同一分片的数据
this.split = split;
fs = FileSystem.get(conf);

this.currentFileIndex = 0;
this.currentOffset = 0;
this.readByte = 0;
this.totalBytes = split.getLength();
this.totalFiles = split.getNumPaths();
this.paths = split.getPaths();
this.lengths = split.getLengths();

//初始化split中的第一个文件
Path file = paths[currentFileIndex];
fileIn = fs.open(file);
this.currentOffset = this.split.getOffset(currentFileIndex);
fileIn.seek(this.split.getOffset(currentFileIndex));
this.endOffset = this.currentOffset + this.lengths[currentFileIndex];
reader = new LineReader(fileIn);
}

@Override
public boolean next(Text key, Text value)
throws IOException {
System.err.write("next() called.\n".getBytes());
// TODO Auto-generated method stub
if (currentFileIndex >= totalFiles)  //所有数据读取完毕
return false;

if (key == null) {
key = new Text();;
}

if (value == null) {
value = new Text();
}
System.err.write("starts to read data.\n".getBytes());
if(currentOffset >= endOffset)        //当前文件读取完毕
{
this.reader.close();              //先关闭上一个文件的reader
this.fileIn.close();
currentFileIndex++;               //处理下一个文件
if(currentFileIndex >= totalFiles)//所有数据读取完毕
{
return false;
}
//继续处理其他文件
Path file = paths[currentFileIndex];
fileIn = fs.open(file);
this.currentOffset = this.split.getOffset(currentFileIndex);
fileIn.seek(this.split.getOffset(currentFileIndex));
this.endOffset = this.currentOffset + this.lengths[currentFileIndex];
reader = new LineReader(fileIn);
}
int newSize = 0;
if (currentOffset < endOffset) {
newSize = reader.readLine(value);
this.readByte += newSize;
currentOffset += newSize;
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}

@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}

@Override
public Text createKey(){
// TODO Auto-generated method stub
return new Text();
}

@Override
public Text createValue() {
// TODO Auto-generated method stub
return new Text();
}

@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return (float) (this.readByte*1.0/this.totalBytes);
}

@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return this.readByte;
}

}


import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class MyCombineFileInputFormat extends CombineFileInputFormat<Text, Text>
{
@Override
public RecordReader<Text, Text> getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
return new CombineFileLineRecordReader((CombineFileSplit) split, job);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: