mapreduce中map方法一次读取整个文件
2016-01-25 13:31
337 查看
最近有一个项目,需要将爬虫获取的众多网页解析出来,并将内容插入hbase,考虑采用mapreduce来实现。由于一个html文件最大不过几M,所以一个文件将会交给一个map处理,mapreduce中最常见的是按行读取文本文件,而我们需要的是一次读取整个文件内容,然后在map方法中用jsoup解析内容。现将实现一次读取整个文件内容的代码贴出来。
实现RecordReader接口,最核心的就是处理好迭代多行文本的内容的逻辑,每次迭代通过调用nextKeyValue()方法来判断是否还有可读的文本行,直接设置当前的Key和Value,分别在方法getCurrentKey()和getCurrentValue()中返回对应的值。
更多信息请参考
1、定义一个FileInputFormat类
public class WholeFileInputFormat extends FileInputFormat<Text,Text>{ @Override public RecordReader<Text, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub RecordReader<Text,Text> recordReader = new WholeFileRecordReader(); return recordReader; } }
2、自定义RecordReader方法
public class WholeFileRecordReader extends RecordReader<Text,Text>{ private FileSplit fileSplit; private JobContext jobContext; private Text currentKey = new Text(); private Text currentValue = new Text(); private boolean finishConverting = false; @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return currentKey; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return currentValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub float progress = 0; if(finishConverting){ progress = 1; } return progress; } @Override public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { this.fileSplit = (FileSplit) arg0; this.jobContext = arg1; String filename = fileSplit.getPath().getName(); this.currentKey = new Text(filename); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if(!finishConverting){ int len = (int)fileSplit.getLength(); // byte[] content = new byte[len]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(jobContext.getConfiguration()); FSDataInputStream in = fs.open(file); BufferedReader br = new BufferedReader(new InputStreamReader(in,"gbk")); // BufferedReader br = new BufferedReader(new InputStreamReader(in,"utf-8")); String line=""; String total=""; while((line= br.readLine())!= null){ total =total+line+"\n"; } br.close(); in.close(); fs.close(); currentValue = new Text(total); finishConverting = true; return true; } return false; } }
实现RecordReader接口,最核心的就是处理好迭代多行文本的内容的逻辑,每次迭代通过调用nextKeyValue()方法来判断是否还有可读的文本行,直接设置当前的Key和Value,分别在方法getCurrentKey()和getCurrentValue()中返回对应的值。
更多信息请参考
相关文章推荐
- 网上图书商城项目学习笔记-006验证码功能
- native app
- ubutntu apt 源
- 手动制作chm格式文档
- 浏览器窗口的高度和宽度,涵盖所有浏览器:(不包括工具栏/滚动条)
- CSU1664: 防水堤坝
- 两种对于Infowindow的操作
- Java JVM使用哪种编码格式
- osx下快捷键相应符号
- VMWare中安装windowsXP遇到的问题
- React组件属性类--propTypes
- VS2013的OSG开发环境配置
- Visual Studio 2015源文件编码问题
- collectionView 中cell间距设置建议
- 怎样用EDIUS实现视频的快速剪裁
- 什么是Objc
- 使用python打印所有汉字
- JSTL中EL表达式无法直接取size的处理
- jqgrid刷新列表,重新加载数据
- [DevExpress] GridControl添加右键菜单