您的位置:首页 > 编程语言 > PHP开发

MultipleOutputs新旧api

2015-06-06 16:46 501 查看
package MRNB_V4;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputs extends Configured implements Tool {

public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, Text> {

@Override
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
output.collect(NullWritable.get(), value);
}

}

//MultipleTextOutputFormat 继承自MultipleOutputFormat,实现输出文件的分类

public static class PartitionByCountryMTOF extends
MultipleTextOutputFormat<NullWritable, Text> { //key is NullWritable, value is Text
protected String generateFileNameForKeyValue(NullWritable key,
Text value, String filename) {
String[] arr = value.toString().split(",",-1);
String country = arr[4].substring(1,3); //获取country的名称
return country + "/"+filename;
}
}

//此处不使用reducer
/*public static class Reducer extends MapReduceBase
implements
org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text> {

@Override
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub

}

}
*/
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf,MultipleOutputs.class);

Path in = new Path(args[0]);
Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

job.setJobName("MultipleOutputs");
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(PartitionByCountryMTOF.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

job.setNumReduceTasks(0);
JobClient.runJob(job);
return 0;
}

public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new MultipleOutputs(), args);
System.exit(res);
}

}


  

package MRNB_V4;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestwithMultipleOutputs extends Configured implements Tool {

public static class MapClass extends
Mapper<LongWritable, Text, Text, IntWritable> {

private MultipleOutputs<Text, IntWritable> mos;

protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs<Text, IntWritable>(context);
}

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split("-");

//mos.write("MOSInt", new Text(tokens[0]),new IntWritable(Integer.parseInt(tokens[1]))); // (第一种)
//mos.write("MOSText", new Text(tokens[0]), tokens[2]); // 第二种
mos.write("mlj", new Text(tokens[0]), line, tokens[0] + "/");// 第三种 同时也可写到指定的文件或文件夹中
}

protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}

public int run(String[] args) throws Exception {

Configuration conf = getConf();

Job job = new Job(conf, "word count with MultipleOutputs");

job.setJarByClass(TestwithMultipleOutputs.class);

/*Path in = new Path(args[0]);
Path out = new Path(args[1]);*/
final String Input_path="hdfs://mlj:9000/hive";
final String Out_path="hdfs://mlj:9000/hive_out";

FileInputFormat.setInputPaths(job, Input_path);
FileOutputFormat.setOutputPath(job, new Path(Out_path));

job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job, "MOSInt", TextOutputFormat.class,Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "mlj", TextOutputFormat.class,Text.class, Text.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(),new TestwithMultipleOutputs(), args);
System.exit(res);
}
}


  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: