您的位置:首页 > 编程语言 > PHP开发

MR案例:多文件输出MultipleOutputs

2015-08-15 13:08 633 查看
问题描述:现有 ip-to-hosts.txt 数据文件,文件中每行数据有两个字段:分别是ip地址和该ip地址对应的国家,以'\t'分隔。要求汇总不同国家的IP数,并以国家名为文件名将其输出。解读:MultipleOutputs类

测试数据:ip-to-hosts.txt

18.217.167.70 United States
206.96.54.107 United States
196.109.151.139 Mauritius
174.52.58.113 United States
142.111.216.8 Canada


代码实现:

package country;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Ip2Hosts {
public static void main(String[] args) throws Exception {

//指定输入输出路径
args =new String[] {"hdfs://10.16.17.182:9000/test/in/ip-to-hosts.txt","hdfs://10.16.17.182:9000/test/out/0821/09"};
System.exit(run(args));
}

public static int run(String[] args) throws Exception {

Job job = Job.getInstance(new Configuration());
job.setJarByClass(Ip2Hosts.class);

job.setMapperClass(IPCountryMapper.class);
job.setReducerClass(IPCountryReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

/**
* 输出 08 和 09 需要调用此设置,07 就需要注释掉
*/
MultipleOutputs.addNamedOutput(job,"abc",TextOutputFormat.class,Text.class,IntWritable.class);

//通过此配置可以不再产生默认的空文件【part-*-00000】
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

return job.waitForCompletion(true) ? 1 : 0;

}
//map阶段
public static class IPCountryMapper    extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String[] splited = value.toString().split("\t");
context.write(new Text(splited[1]), new IntWritable(1));
}
}
//reduce阶段
public static class IPCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

//1.定义多文件输出类MultipleOutputs
private MultipleOutputs<Text, IntWritable> mos;

@Override
protected void setup(Context context
) throws IOException, InterruptedException {

//2.MultipleOutputs初始化
mos = new MultipleOutputs<Text, IntWritable>(context);
}

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int total = 0;
for(IntWritable value: values) {
total += value.get();
}
        //3.调用MultipleOutputs中的write()方法
            //07-输出
mos.write(/*"abc",*/ key, new IntWritable(total),key.toString());

//08-输出
mos.write("abc", key, new IntWritable(total)/*,key.toString()*/);

//09-输出
mos.write("abc", key, new IntWritable(total),key.toString());
}

@Override
protected void cleanup(Context context
) throws IOException, InterruptedException {

//4.关闭流资源
mos.close();
}
}

}


代码解读:

1).输出-07所调用的方法和对应的输出结果:

/**
* @ 输出的key类型
* @ 输出的value类型
* @ 输出的基路径,实际输出结果为:'基路径-r-00000'
*/
MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)




2).输出-08所调用的方法和对应的输出结果:

/**
* @ 自定义的输出.对于不指定'基路径',则结果为:'自定义的输出-r-00000'
* @ 输出的key类型
* @ 输出的value类型
*/
MultipleOutputs.write(String namedOutput, K key, V value)



3).输出-09所调用的方法和对应的输出结果:

/**
* @ 自定义的输出.
* @ 输出的key类型
* @ 输出的value类型
* @ 输出的基路径,指定输出'基路径',则结果为:'基路径-r-00000'
*/
MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)




用法总结:

在Mapper或Reducer类中创建 MultipleOutputs 成员变量 mos

在setup()方法中初始化 mos 变量,

在map()或reduce()方法中调用 mos.write() 方法输出数据,代替context.write()

mos.write() 方法具有三个重载,对于 输出-08-09 还需在Job配置中指定输出格式

在cleanup()方法中调用 mos.close() 方法关闭输出流
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: