hadoop的reducer输出多个文件
2010-10-22 14:07
274 查看
hadoop的reducer输出多个文件
关键字: hadoop, mapreduce有时候我们想到这样的功能: reducer能根据key(或value)值来输出多个文件,同一key(或value)处于同一个文件中。现在hadoop的0.17.x版本可以重写MultipleOutputFormat的generateFileNameForKeyValue就可以实现此功能。
比如:
Java代码
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
extends MultipleOutputFormat<K, V> {
private TextOutputFormat<K, V> theTextOutputFormat = null;
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return name + "_" + value.toString();
}
}
package org.apache.hadoop.mapred.lib; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Progressable; public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable> extends MultipleOutputFormat<K, V> { private TextOutputFormat<K, V> theTextOutputFormat = null; @Override protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { if (theTextOutputFormat == null) { theTextOutputFormat = new TextOutputFormat<K, V>(); } return theTextOutputFormat.getRecordWriter(fs, job, name, arg3); } @Override protected String generateFileNameForKeyValue(K key, V value, String name) { return name + "_" + value.toString(); } }
试一下wordcount这个例子,把WordCount.java的run函数加上一行
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);
即
Java代码
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);
List<String> other_args = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); // the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class); List<String> other_args = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; }
则使用
bin/hadoop jar build/hadoop-*-examples.jar wordcount conf wordcount_output
可输出一个目录wordcount_output
Java代码
$ls wordcount_output/
part-00000_1 part-00000_13 part-00000_16 part-00000_214 part-00000_28 part-00000_38 part-00000_5 part-00000_8
part-00000_10 part-00000_14 part-00000_17 part-00000_22 part-00000_29 part-00000_4 part-00000_6 part-00000_9
part-00000_102 part-00000_141 part-00000_19 part-00000_23 part-00000_3 part-00000_42 part-00000_62
part-00000_11 part-00000_143 part-00000_2 part-00000_24 part-00000_31 part-00000_44 part-00000_63
part-00000_117 part-00000_15 part-00000_20 part-00000_25 part-00000_35 part-00000_46 part-00000_7
part-00000_12 part-00000_152 part-00000_21 part-00000_26 part-00000_36 part-00000_47 part-00000_70
相关文章推荐
- 控制Hadoop的reducer函数输出文件命名
- Hadoop基于WordCount的Mapper、Reducer、Combiner、Partitioner和自定义多文件输出
- hadoop的reducer输出多个文件
- hadoop之WordCoun输出文件用时间命名,避免每次运行都要修改
- Hadoop Streaming 输出文件分割
- Hadoop 实现多文件输出
- hadoop多文件输出
- Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)
- Hadoop编程在HDFS里新建文件并写入内容,以及输出
- Hadoop Streaming 实战: 输出文件分割
- Hadoop的多文件输出及自定义文件名
- 【Hadoop】利用MultipleOutputs,MultiOutputFormat实现以不同格式输出到多个文件
- Hadoop控制输出文件命名 - MultipleOutputs
- Hadoop,Spark[一]:文件输出压缩
- Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)
- hadoop 词频统计&&adoop jar jar包名.jar 包名.主类名 输入路径(文件的目录,不包括文件本身) 输出路径
- Hadoop实验:用java直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
- hadoop的MultipleOutputs多文件输出
- Hadoop MultipleOutputs输出到多个文件中