您的位置:首页 > 编程语言 > PHP开发

MapReduce-MulitipleOutputs实现自己定义输出到多个文件夹

2017-04-27 13:54 232 查看
输入源数据例子:

Source1-0001
Source2-0002
Source1-0003
Source2-0004
Source1-0005
Source2-0006
Source3-0007
Source3-0008
描写叙述:

Source1开头的数据属于集合A。

Source2开头的数据属于集合B;

Source3开头的数据即属于集合A,也属于集合B。

输出要求:

完整保留集合A数据(包括Source1、Source3开头数据)

完整保留集合B数据(包括Source2、Source3开头数据)

程序实现:

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.mahout.common.AbstractJob;

import com.yhd.common.util.HadoopUtil;

/**
* AbstractJob 是mahout的Job模板,能够不使用该模板,
* 实则的核心部分在于MultipleOutputs部分
*
* @author ouyangyewei
*
*/
public class TestMultipleOutputsJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();

Map<String, List<String>> parseArgs = parseArguments(args);
if(parseArgs==null){
return -1;
}

HadoopUtil.delete(getConf(), getOutputPath());

Configuration conf = new Configuration();
conf.setInt("mapred.reduce.tasks", 4);
conf.set("mapred.job.queue.name", "pms");
conf.set("mapred.child.java.opts", "-Xmx3072m");
conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05");

Job job = new Job(new Configuration(conf));
job.setJobName("TestMultipleOutputsJob");
job.setJarByClass(TestMultipleOutputsJob.class);
job.setMapperClass(MultipleMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, this.getInputPath());
FileOutputFormat.setOutputPath(job, this.getOutputPath());

/** 输出文件格式将为:Source1-m-**** */
MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class);
/** 输出文件格式将为:Source2-m-**** */
MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class);

boolean suceeded = job.waitForCompletion(true);
if(!suceeded) {
return -1;
}
return 0;
}

/**
*
* @author ouyangyewei
*
*/
public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> {
private MultipleOutputs<Text, Text> mos = null;

@Override
protected void setup(Context context
) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);
}

public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] tokenizer = line.split("-");

if (tokenizer[0].equals("Source1")) {
/** 集合A的数据 */
mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
} else if (tokenizer[0].equals("Source2")) {
/** 集合B的数据 */
mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
}

/** 集合A交集合B的数据 */
if (tokenizer[0].equals("Source3")) {
mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
}
}

protected void cleanup(Context context
) throws IOException, InterruptedException {
mos.close();
}
}

/**
* @param args
*/
public static void main(String[] args) {
System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
"com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");

TestMultipleOutputsJob instance = new TestMultipleOutputsJob();
try {
instance.run(args);
} catch (Exception e) {
e.printStackTrace();
}
}
}


使用hadoop jar命令调度执行jar包代码:
hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob \
--input /user/pms/workspace/ouyangyewei/testMultipleOutputs \
--output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output


程序执行以后,输出的结果:
[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
Found 4 items
-rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
-rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
-rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS
-rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
Source1	0001
Source1	0003
Source1	0005
Source3	0007
Source3	0008

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
Source2	0002
Source2	0004
Source2	0006
Source3	0007
Source3	0008


补充于2014-12-18:

这样的方式的缺陷是会产生非常多类似Source1或Source2开头的子文件,一种非常好的方式就是指定baseOutputPath,将Source1开头的文件放在同一个文件夹中管理

对上述代码进行改写实现文件夹管理:

public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] tokenizer = line.split("-");

if (tokenizer[0].equals("Source1")) {
/** 集合A的数据 */
mos.write("Source1",
new Text(tokenizer[0]),
tokenizer[1],
"Source1/part");
} else if (tokenizer[0].equals("Source2")) {
/** 集合B的数据 */
mos.write("Source2",
new Text(tokenizer[0]),
tokenizer[1],
"Source2/part");
}

/** 集合A交集合B的数据 */
if (tokenizer[0].equals("Source3")) {
mos.write("Source1",
new Text(tokenizer[0]),
tokenizer[1],
"Source1/part");

mos.write("Source2",
new Text(tokenizer[0]),
tokenizer[1],
"Source2/part");
}
}
程序执行以后,输出的结果:

$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output
Found 4 items
-rw-r--r--   3 pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/_SUCCESS
-rw-r--r--   3 pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/part-r-00000
drwxr-xr-x   - pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
drwxr-xr-x   - pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
Found 1 items
-rw-r--r--   3 pms pms   65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1/part-r-00000

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2
Found 1 items
-rw-r--r--   3 pms pms   65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2/part-r-00000


能够參考下:http://dirlt.com/mapred.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐