MapReduce-MulitipleOutputs实现自己定义输出到多个文件夹
2017-04-27 13:54
232 查看
输入源数据例子:
Source1开头的数据属于集合A。
Source2开头的数据属于集合B;
Source3开头的数据即属于集合A,也属于集合B。
输出要求:
完整保留集合A数据(包括Source1、Source3开头数据)
完整保留集合B数据(包括Source2、Source3开头数据)
程序实现:
使用hadoop jar命令调度执行jar包代码:
程序执行以后,输出的结果:
补充于2014-12-18:
这样的方式的缺陷是会产生非常多类似Source1或Source2开头的子文件,一种非常好的方式就是指定baseOutputPath,将Source1开头的文件放在同一个文件夹中管理
对上述代码进行改写实现文件夹管理:
能够參考下:http://dirlt.com/mapred.html
Source1-0001 Source2-0002 Source1-0003 Source2-0004 Source1-0005 Source2-0006 Source3-0007 Source3-0008描写叙述:
Source1开头的数据属于集合A。
Source2开头的数据属于集合B;
Source3开头的数据即属于集合A,也属于集合B。
输出要求:
完整保留集合A数据(包括Source1、Source3开头数据)
完整保留集合B数据(包括Source2、Source3开头数据)
程序实现:
import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.mahout.common.AbstractJob; import com.yhd.common.util.HadoopUtil; /** * AbstractJob 是mahout的Job模板,能够不使用该模板, * 实则的核心部分在于MultipleOutputs部分 * * @author ouyangyewei * */ public class TestMultipleOutputsJob extends AbstractJob { @Override public int run(String[] args) throws Exception { addInputOption(); addOutputOption(); Map<String, List<String>> parseArgs = parseArguments(args); if(parseArgs==null){ return -1; } HadoopUtil.delete(getConf(), getOutputPath()); Configuration conf = new Configuration(); conf.setInt("mapred.reduce.tasks", 4); conf.set("mapred.job.queue.name", "pms"); conf.set("mapred.child.java.opts", "-Xmx3072m"); conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05"); Job job = new Job(new Configuration(conf)); job.setJobName("TestMultipleOutputsJob"); job.setJarByClass(TestMultipleOutputsJob.class); job.setMapperClass(MultipleMapper.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, this.getInputPath()); FileOutputFormat.setOutputPath(job, this.getOutputPath()); /** 输出文件格式将为:Source1-m-**** */ MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class); /** 输出文件格式将为:Source2-m-**** */ MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class); boolean suceeded = job.waitForCompletion(true); if(!suceeded) { return -1; } return 0; } /** * * @author ouyangyewei * */ public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> { private MultipleOutputs<Text, Text> mos = null; @Override protected void setup(Context context ) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, Text>(context); } public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); String[] tokenizer = line.split("-"); if (tokenizer[0].equals("Source1")) { /** 集合A的数据 */ mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]); } else if (tokenizer[0].equals("Source2")) { /** 集合B的数据 */ mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]); } /** 集合A交集合B的数据 */ if (tokenizer[0].equals("Source3")) { mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]); mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]); } } protected void cleanup(Context context ) throws IOException, InterruptedException { mos.close(); } } /** * @param args */ public static void main(String[] args) { System.setProperty("javax.xml.parsers.DocumentBuilderFactory", "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"); System.setProperty("javax.xml.parsers.SAXParserFactory", "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); TestMultipleOutputsJob instance = new TestMultipleOutputsJob(); try { instance.run(args); } catch (Exception e) { e.printStackTrace(); } } }
使用hadoop jar命令调度执行jar包代码:
hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob \ --input /user/pms/workspace/ouyangyewei/testMultipleOutputs \ --output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
程序执行以后,输出的结果:
[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output Found 4 items -rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000 -rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000 -rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS -rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000 Source1 0001 Source1 0003 Source1 0005 Source3 0007 Source3 0008 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000 Source2 0002 Source2 0004 Source2 0006 Source3 0007 Source3 0008
补充于2014-12-18:
这样的方式的缺陷是会产生非常多类似Source1或Source2开头的子文件,一种非常好的方式就是指定baseOutputPath,将Source1开头的文件放在同一个文件夹中管理
对上述代码进行改写实现文件夹管理:
public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); String[] tokenizer = line.split("-"); if (tokenizer[0].equals("Source1")) { /** 集合A的数据 */ mos.write("Source1", new Text(tokenizer[0]), tokenizer[1], "Source1/part"); } else if (tokenizer[0].equals("Source2")) { /** 集合B的数据 */ mos.write("Source2", new Text(tokenizer[0]), tokenizer[1], "Source2/part"); } /** 集合A交集合B的数据 */ if (tokenizer[0].equals("Source3")) { mos.write("Source1", new Text(tokenizer[0]), tokenizer[1], "Source1/part"); mos.write("Source2", new Text(tokenizer[0]), tokenizer[1], "Source2/part"); } }程序执行以后,输出的结果:
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output Found 4 items -rw-r--r-- 3 pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/_SUCCESS -rw-r--r-- 3 pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/part-r-00000 drwxr-xr-x - pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1 drwxr-xr-x - pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack] $hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1 Found 1 items -rw-r--r-- 3 pms pms 65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1/part-r-00000 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack] $hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2 Found 1 items -rw-r--r-- 3 pms pms 65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2/part-r-00000
能够參考下:http://dirlt.com/mapred.html
相关文章推荐
- 输出带有是/否的对话框,实现效果是:点击是,继续添加内容,点击否,返回自己定义的页面
- C++ Recipes 自己实现输出重定向
- 实现自己定义的IP头
- 九周任务二:定义Time类中的<<和>>运算符重载,实现时间的输入输出
- C++第九周【任务二】定义Time类中的<<和>>运算符重载,实现时间的输入输出
- C++第九周【任务一】定义Complex类中的<<和>>运算符的重载,实现输入和输出
- 【C++源代码】文件夹比较、文件比较的数据结构,定义和实现
- 实现mapreduce多文件自定义输出
- 第九周实验报告 任务2 定义Complex类中的<<和>>运算符的重载,实现输入和输出,改造原程序中对运算结果显示方式,使程序读起来更自然。
- 《第九周任务一》定义Complex类中的《《和》》运算符的重载,实现输入和输出,改造原程序中对运算结果显示方式,使程序读起来更自然
- popen函数实现捕获shell里命令的输出--外加自己的一个小接口函数
- 自己写的debug类 实现变更输出
- 把文件夹下所有后缀为自己--想要的文件名称--全部输出出来
- 配置实现-列表定义,编辑需自己开发的扩展实现举例(源代码)
- 第九周任务二之定义Time类中的<<和>>运算符重载,实现时间的输入输出
- .net2.0 中自己定义配置的实现
- Mapreduce如何实现自己的InputFormat
- 第九周实验任务二--定义Complex类中的<<和>>运算符的重载,实现输入和输出,使程序读起来更自然
- 《C++第九周实验报告2-1》---接第8周任务2,定义Time类中的<<和>>运算符重载,实现时间的输入输出