Hadoop Map/Reduce 新API中自己的FileInputFormat写法
2011-12-11 22:39
363 查看
在看《Hadoop in Action》时发现代码使用的是旧的API,且部分API已经标记为Deprecated。
所以自己尝试着写了一个使用新API的例子来完成该代码的功能。
数据格式如下:
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
...
程序的目的是将所有数据的CITING和CITED值反过来输出。
MapReduce程序:
package com;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJob extends Configured implements Tool {
public static class MapClass extends Mapper<Text,Text,Text,Text> {
public void map(Text key,Text value,Context context)
throws IOException, InterruptedException {
context.write(value, key);
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String csv = "";
for(Text value : values) {
if( csv.length() > 0 ) csv += ",";
csv += value.toString();
}
context.write(key, new Text(csv));
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args); //调用新的类的方法免除配置的相关琐碎的细节
System.exit(res);
}
@Override
public int run(String[] arg0) throws Exception {
Job job = new Job();
job.setJarByClass(MyJob.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
}
MyInputFormat类:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.MyRecordReader;
public class MyInputFormat extends FileInputFormat<Text,Text> {
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new MyRecordReader(context.getConfiguration());
}
}
MyRecordReaader类:(参照KeyValueTextInputFormat(hadoop-0.23.0)写成)
package com;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class MyRecordReader extends RecordReader<Text,Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private Text innerValue;
private Text key;
private Text value;
public MyRecordReader(Configuration conf) {
lineRecordReader = new LineRecordReader();
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
lineRecordReader.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
lineRecordReader.initialize(genericSplit, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
byte[] line = null;
int lineLen = -1;
if( lineRecordReader.nextKeyValue() ) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if( line == null )
return false;
if( key == null )
key = new Text();
if( value == null )
value = new Text();
int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key,value,line,lineLen,pos);
return true;
}
public int findSeparator(byte[] utf, int start, int length, byte sep) {
for( int i = start; i < (start + length); ++ i ) {
if( utf[i] == sep ) {
return i;
}
}
return -1;
}
public void setKeyValue(Text key, Text value, byte[] line,
int lineLen, int pos) {
if( pos == -1 ) {
key.set(line, 0, lineLen);
value.set("");
} else {
key.set(line, 0, pos);
value.set(line,pos+1,lineLen-pos-1);
}
}
}
所以自己尝试着写了一个使用新API的例子来完成该代码的功能。
数据格式如下:
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
...
程序的目的是将所有数据的CITING和CITED值反过来输出。
MapReduce程序:
package com;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJob extends Configured implements Tool {
public static class MapClass extends Mapper<Text,Text,Text,Text> {
public void map(Text key,Text value,Context context)
throws IOException, InterruptedException {
context.write(value, key);
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String csv = "";
for(Text value : values) {
if( csv.length() > 0 ) csv += ",";
csv += value.toString();
}
context.write(key, new Text(csv));
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args); //调用新的类的方法免除配置的相关琐碎的细节
System.exit(res);
}
@Override
public int run(String[] arg0) throws Exception {
Job job = new Job();
job.setJarByClass(MyJob.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
}
MyInputFormat类:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.MyRecordReader;
public class MyInputFormat extends FileInputFormat<Text,Text> {
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new MyRecordReader(context.getConfiguration());
}
}
MyRecordReaader类:(参照KeyValueTextInputFormat(hadoop-0.23.0)写成)
package com;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class MyRecordReader extends RecordReader<Text,Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private Text innerValue;
private Text key;
private Text value;
public MyRecordReader(Configuration conf) {
lineRecordReader = new LineRecordReader();
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
lineRecordReader.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
lineRecordReader.initialize(genericSplit, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
byte[] line = null;
int lineLen = -1;
if( lineRecordReader.nextKeyValue() ) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if( line == null )
return false;
if( key == null )
key = new Text();
if( value == null )
value = new Text();
int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key,value,line,lineLen,pos);
return true;
}
public int findSeparator(byte[] utf, int start, int length, byte sep) {
for( int i = start; i < (start + length); ++ i ) {
if( utf[i] == sep ) {
return i;
}
}
return -1;
}
public void setKeyValue(Text key, Text value, byte[] line,
int lineLen, int pos) {
if( pos == -1 ) {
key.set(line, 0, lineLen);
value.set("");
} else {
key.set(line, 0, pos);
value.set(line,pos+1,lineLen-pos-1);
}
}
}
相关文章推荐
- Hadoop Map/Reduce InputFormat基础
- Hadoop MapReduce处理海量小文件(每次整个小文件整体读入到map):基于FileInputFormat
- hadoop old API CombineFileInputFormat
- Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)
- 自定义hadoop map/reduce输入文件切割InputFormat
- Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat(整个小文件读入到map中)
- Hadoop Map/Reduce 新API中自己的FileInputFormat写法
- 自定义hadoop map/reduce输入文件切割InputFormat 更改输入value的分隔符
- 自定义hadoop map/reduce输入文件切割InputFormat
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat(每次往map中读入1行)
- 自定义hadoop map/reduce输入文件切割InputFormat
- 自定义hadoop map/reduce输入文件切割InputFormat
- 自定义hadoop map/reduce输入文件切割InputFormat 更改输入value的分隔符
- spatialhadoop2.3源码阅读(九) ShapeLineInputFormat & ShapeLineRecordReader & SpatialRecordReader[FileMBR]
- eclipse+hadoop2.7.5的map-reduce的API的配置
- hadoop的archive归档和CombineFileInputFormat的使用
- Hadoop_FileInputFormat分片
- Hadoop CombineFileInputFormat原理说明(转)
- Hadoop使用CombineFileInputFormat处理大量小文件接口实现(Hadoop-1.0.4)