您的位置:首页 > 运维架构

hadoop的reducer输出多个文件

2010-10-22 14:07 274 查看

hadoop的reducer输出多个文件

关键字: hadoop, mapreduce
有时候我们想到这样的功能: reducer能根据key(或value)值来输出多个文件,同一key(或value)处于同一个文件中。现在hadoop的0.17.x版本可以重写MultipleOutputFormat的generateFileNameForKeyValue就可以实现此功能。

比如:

Java代码



package org.apache.hadoop.mapred.lib;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.RecordWriter;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.util.Progressable;

public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>

extends MultipleOutputFormat<K, V> {

private TextOutputFormat<K, V> theTextOutputFormat = null;

@Override

protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,

String name, Progressable arg3) throws IOException {

if (theTextOutputFormat == null) {

theTextOutputFormat = new TextOutputFormat<K, V>();

}

return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);

}

@Override

protected String generateFileNameForKeyValue(K key, V value, String name) {

return name + "_" + value.toString();

}

}

package org.apache.hadoop.mapred.lib;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;

public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
extends MultipleOutputFormat<K, V> {

private TextOutputFormat<K, V> theTextOutputFormat = null;

@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}

@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return name + "_" + value.toString();
}

}


试一下wordcount这个例子,把WordCount.java的run函数加上一行
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);


Java代码



public int run(String[] args) throws Exception {

JobConf conf = new JobConf(getConf(), WordCount.class);

conf.setJobName("wordcount");

// the keys are words (strings)

conf.setOutputKeyClass(Text.class);

// the values are counts (ints)

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);

List<String> other_args = new ArrayList<String>();

for(int i=0; i < args.length; ++i) {

try {

if ("-m".equals(args[i])) {

conf.setNumMapTasks(Integer.parseInt(args[++i]));

} else if ("-r".equals(args[i])) {

conf.setNumReduceTasks(Integer.parseInt(args[++i]));

} else {

other_args.add(args[i]);

}

} catch (NumberFormatException except) {

System.out.println("ERROR: Integer expected instead of " + args[i]);

return printUsage();

} catch (ArrayIndexOutOfBoundsException except) {

System.out.println("ERROR: Required parameter missing from " +

args[i-1]);

return printUsage();

}

}

// Make sure there are exactly 2 parameters left.

if (other_args.size() != 2) {

System.out.println("ERROR: Wrong number of parameters: " +

other_args.size() + " instead of 2.");

return printUsage();

}

FileInputFormat.setInputPaths(conf, other_args.get(0));

FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

JobClient.runJob(conf);

return 0;

}

public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");

// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);

List<String> other_args = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

JobClient.runJob(conf);
return 0;
}


则使用
bin/hadoop jar build/hadoop-*-examples.jar wordcount conf wordcount_output
可输出一个目录wordcount_output

Java代码



$ls wordcount_output/

part-00000_1 part-00000_13 part-00000_16 part-00000_214 part-00000_28 part-00000_38 part-00000_5 part-00000_8

part-00000_10 part-00000_14 part-00000_17 part-00000_22 part-00000_29 part-00000_4 part-00000_6 part-00000_9

part-00000_102 part-00000_141 part-00000_19 part-00000_23 part-00000_3 part-00000_42 part-00000_62

part-00000_11 part-00000_143 part-00000_2 part-00000_24 part-00000_31 part-00000_44 part-00000_63

part-00000_117 part-00000_15 part-00000_20 part-00000_25 part-00000_35 part-00000_46 part-00000_7

part-00000_12 part-00000_152 part-00000_21 part-00000_26 part-00000_36 part-00000_47 part-00000_70
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: