通过MultipleOutputs写到多个文件
2016-05-02 19:46
435 查看
MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突
1、项目需求
假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。
2、数据集
wolys@21cn.com
zss1984@126.com
294522652@qq.com
simulateboy@163.com
zhoushigang_123@163.com
sirenxing424@126.com
lixinyu23@qq.com
chenlei1201@gmail.com
370433835@qq.com
cxx0409@126.com
viv093@sina.com
q62148830@163.com
65993266@qq.com
summeredison@sohu.com
zhangbao-autumn@163.com
diduo_007@yahoo.com.cn
fxh852@163.com
3、实现
4、运行效果
5、注意事项
1、在reducer中调用时,要调用MultipleOutputs以下接口
public void write(KEYOUT key,VALUEOUT value, String baseOutputPath) throws IOException,InterruptedException
如果调用
public <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException
则需要在job中,预先声明named output(如下),不然会报错:named output xxx not defined:
2. 默认情况下,输出目录会生成part-r-00000或者part-m-00000的空文件,需要如下设置后,才不会生成
就是去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
3. multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符“/”,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么输出文件则为
如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
实现代码及数据:下载
1、项目需求
假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。
2、数据集
wolys@21cn.com
zss1984@126.com
294522652@qq.com
simulateboy@163.com
zhoushigang_123@163.com
sirenxing424@126.com
lixinyu23@qq.com
chenlei1201@gmail.com
370433835@qq.com
cxx0409@126.com
viv093@sina.com
q62148830@163.com
65993266@qq.com
summeredison@sohu.com
zhangbao-autumn@163.com
diduo_007@yahoo.com.cn
fxh852@163.com
3、实现
package com.buaa; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName MultipleOutputsDemo * @PackageName com.buaa * @ClassName EmailMultipleOutputsDemo * @Description 统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下 * @Author 刘吉超 * @Date 2016-05-02 15:25:18 */ public class EmailMultipleOutputsDemo extends Configured implements Tool { public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, one); } } public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private MultipleOutputs<Text, IntWritable> multipleOutputs; @Override protected void setup(Context context) throws IOException ,InterruptedException{ multipleOutputs = new MultipleOutputs< Text, IntWritable>(context); } protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException { // 开始位置 int begin = Key.toString().indexOf("@"); // 结束位置 int end = Key.toString().indexOf("."); if(begin >= end){ return; } // 获取邮箱类别,比如 qq String name = Key.toString().substring(begin+1, end); int sum = 0; for (IntWritable value : Values) { sum += value.get(); } /* * multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。 * 如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn); * 如果包含文件分隔符"/",例如baseOutputPath="029070-99999/1901/part",那么输出文件则为029070-99999/1901/part-r-nnnnn */ multipleOutputs.write(Key, new IntWritable(sum), name); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException{ multipleOutputs.close(); } } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { // 读取配置文件 Configuration conf = new Configuration(); // 判断目录是否存在,如果存在,则删除 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } // 新建一个任务 Job job = new Job(conf, "MultipleDemo"); // 主类 job.setJarByClass(EmailMultipleOutputsDemo.class); // 输入路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // Mapper job.setMapperClass(EmailMapper.class); // Reducer job.setReducerClass(EmailReducer.class); // key输出类型 job.setOutputKeyClass(Text.class); // value输出类型 job.setOutputValueClass(IntWritable.class); // 去掉job设置outputFormatClass,改为通过LazyOutputFormat设置 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args0) throws Exception { // 数据输入路径和输出路径 // String[] args0 = { // "hdfs://ljc:9000/buaa/email/email.txt", // "hdfs://ljc:9000/buaa/email/out/" // }; int ec = ToolRunner.run(new Configuration(), new EmailMultipleOutputsDemo(), args0); System.exit(ec); } }
4、运行效果
5、注意事项
1、在reducer中调用时,要调用MultipleOutputs以下接口
public void write(KEYOUT key,VALUEOUT value, String baseOutputPath) throws IOException,InterruptedException
如果调用
public <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException
则需要在job中,预先声明named output(如下),不然会报错:named output xxx not defined:
MultipleOutputs.addNamedOutput(job, "moshouzhengba", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "maoxiandao", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "yingxionglianmen", TextOutputFormat.class, Text.class, Text.class);
2. 默认情况下,输出目录会生成part-r-00000或者part-m-00000的空文件,需要如下设置后,才不会生成
// job.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
就是去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
3. multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符“/”,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么输出文件则为
如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
实现代码及数据:下载
相关文章推荐
- yii2.0 自带验证码项目总结
- PHP递归函数经典算法(斐波那契/阶乘/高斯算法)
- php正则表达式和数组
- PHP基础递归文件目录
- php实现堆排序
- php命名空间里面的use关键字的理解
- PHP命名空间的理解
- Ubuntu下vsftpd - 虚拟账户配置
- php实现二路归并排序
- ftp服务在iptables防火墙设置
- PHPSTORM_设置配置优化
- pureftp部署和优化
- 如何调试PHP程序
- php限制文件下载速度的代码
- PHP大量数据循环时内存耗尽问题的解决方案(适用于导出大量数据时内存耗尽情况)
- PHP加密代码
- php显示动态时间源代码
- PHP中Web Service应用
- 深入学习PHP内核
- 设置IIS7/IIS7.5的FTP支持断点续传