您的位置:首页 > 运维架构

Hadoop Map/Reduce 新API中自己的FileInputFormat写法

2011-12-11 22:39 363 查看
在看《Hadoop in Action》时发现代码使用的是旧的API,且部分API已经标记为Deprecated。

所以自己尝试着写了一个使用新API的例子来完成该代码的功能。

数据格式如下:

"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384

...

程序的目的是将所有数据的CITING和CITED值反过来输出。

MapReduce程序:

package com;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool {

public static class MapClass extends Mapper<Text,Text,Text,Text> {

public void map(Text key,Text value,Context context)
throws IOException, InterruptedException {
context.write(value, key);
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text> {

public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

String csv = "";
for(Text value : values) {
if( csv.length() > 0 ) csv += ",";
csv += value.toString();
}
context.write(key, new Text(csv));
}
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args); //调用新的类的方法免除配置的相关琐碎的细节
System.exit(res);
}

@Override
public int run(String[] arg0) throws Exception {
Job job = new Job();
job.setJarByClass(MyJob.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
}

MyInputFormat类:

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import com.MyRecordReader;

public class MyInputFormat extends FileInputFormat<Text,Text> {
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {

return new MyRecordReader(context.getConfiguration());
}

}

MyRecordReaader类:(参照KeyValueTextInputFormat(hadoop-0.23.0)写成)
package com;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class MyRecordReader extends RecordReader<Text,Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';

private Text innerValue;
private Text key;
private Text value;

public MyRecordReader(Configuration conf) {
lineRecordReader = new LineRecordReader();
}

@Override
public void close() throws IOException {
// TODO Auto-generated method stub
lineRecordReader.close();
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineRecordReader.getProgress();
}

@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
lineRecordReader.initialize(genericSplit, context);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
byte[] line = null;
int lineLen = -1;

if( lineRecordReader.nextKeyValue() ) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}

if( line == null )
return false;
if( key == null )
key = new Text();
if( value == null )
value = new Text();

int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key,value,line,lineLen,pos);
return true;
}

public int findSeparator(byte[] utf, int start, int length, byte sep) {
for( int i = start; i < (start + length); ++ i ) {
if( utf[i] == sep ) {
return i;
}
}
return -1;
}

public void setKeyValue(Text key, Text value, byte[] line,
int lineLen, int pos) {
if( pos == -1 ) {
key.set(line, 0, lineLen);
value.set("");
} else {
key.set(line, 0, pos);
value.set(line,pos+1,lineLen-pos-1);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息