您的位置:首页 > 其它

mapreduce中map方法一次读取整个文件

2016-01-25 13:31 337 查看
最近有一个项目,需要将爬虫获取的众多网页解析出来,并将内容插入hbase,考虑采用mapreduce来实现。由于一个html文件最大不过几M,所以一个文件将会交给一个map处理,mapreduce中最常见的是按行读取文本文件,而我们需要的是一次读取整个文件内容,然后在map方法中用jsoup解析内容。现将实现一次读取整个文件内容的代码贴出来。

1、定义一个FileInputFormat类

public class WholeFileInputFormat extends FileInputFormat<Text,Text>{

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
RecordReader<Text,Text> recordReader = new WholeFileRecordReader();
return recordReader;
}

}


2、自定义RecordReader方法

public class WholeFileRecordReader extends RecordReader<Text,Text>{

private FileSplit fileSplit;
private JobContext jobContext;
private Text currentKey = new Text();
private Text currentValue = new Text();
private boolean finishConverting = false;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub

}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return currentKey;
}

@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return currentValue;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
float progress = 0;
if(finishConverting){
progress = 1;
}
return progress;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) arg0;
this.jobContext = arg1;
String filename = fileSplit.getPath().getName();
this.currentKey = new Text(filename);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if(!finishConverting){
int len = (int)fileSplit.getLength();
//          byte[] content = new byte[len];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(jobContext.getConfiguration());
FSDataInputStream in = fs.open(file);
BufferedReader br = new BufferedReader(new InputStreamReader(in,"gbk"));
//          BufferedReader br = new BufferedReader(new InputStreamReader(in,"utf-8"));
String line="";
String total="";
while((line= br.readLine())!= null){
total =total+line+"\n";
}
br.close();
in.close();
fs.close();
currentValue = new Text(total);
finishConverting = true;
return true;
}
return false;
}

}


实现RecordReader接口,最核心的就是处理好迭代多行文本的内容的逻辑,每次迭代通过调用nextKeyValue()方法来判断是否还有可读的文本行,直接设置当前的Key和Value,分别在方法getCurrentKey()和getCurrentValue()中返回对应的值。

更多信息请参考
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: