您的位置:首页 > 运维架构

Hadoop 中的 MapReduce链接作业之预处理和后处理阶段的链接

2013-06-17 17:25 405 查看
package com.test;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* MapReduce链接作业之预处理和后处理阶段的链接
*
* @author Administrator
*
*/
public class MyJobLink extends Configured implements Tool {

public static class Reduce extends MapReduceBase implements
Reducer<LongWritable, Text, Text, Text> {

public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

/**
* nothing
*/

output.collect(new Text("1"), new Text("1"));

}
}

public static class Map1 extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
/**
* nothing
*/
output.collect(value, new Text(key.toString()));
}
}

public static class Map2 extends MapReduceBase implements
Mapper<Text, Text, LongWritable, Text> {
public void map(Text key, Text value,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
/**
* nothing
*/
output.collect(new LongWritable(Long.valueOf(value.toString())), key);
}
}

public static class Map3 extends MapReduceBase implements
Mapper<Text, Text, LongWritable, Text> {

public void map(Text key, Text value,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
/**
* nothing
*/
output.collect(new LongWritable(Long.valueOf("1")), key);
}
}

public static class Map4 extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, Text> {

public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
/**
* nothing
*/
output.collect(new LongWritable(Long.valueOf("1")), new Text("1"));
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf);

job.setJobName("ChainJob");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

Path in = new Path(args[0]);
Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

/**
* 在作业中添加Map1阶段
* 使用ChainMapper.addMapper()添加位于Reduce之前的所有步骤
*
* ChainMapper.addMapper(JobConf job,
*  Class<? extends Mapper<LongWritable, Text, Text, Text>> klass,
*  Class<? extends LongWritable> inputKeyClass,
*  Class<? extends Text> inputValueClass,
*  Class<? extends Text> outputKeyClass,
*  Class<? extends Text> outputValueClass,
*  boolean byValue,
*  JobConf mapperConf)
*  该方法有8个参数,第一个和最后一个分别为全局和本地的JobConf对象.
*  第二个参数(klass)是Mapper类,负责数据处理.
*  余下4个参数inputKeyClass,inputValueClass, outputKeyClass, outputValueClass
*  是这个Mapper类中的输入/输出类的类型
*
*  稍微解释一下byValue这个参数.在标准的Mapper模型中,
*  键/值对的输出在序列化之后写入磁盘(键和值实现为Writable使得他们能够被复制和序列化),
*  等待被洗牌到一个可能完全不同的节点上.形式上认为这个过程采用的是值传递(passed by value)
*  发送的是键值对的副本.
*  在目前的情况下我们可以将一个Mapper与另一个相链接,在相同的JVM线程中一起执行.
*  因此,键/值对的发送有可能采用引用传递(passed by reference),
*  初始Mapper的输出放到内存中,后续的Mapper直接引用相同的内存位置.
*  当Mapper1调用OutputCollector.collect(K k,V v)时,对象k和v直接传递给Map2的map()方法.
*  mapper之间可能有大量的数据需要传递,避免去复制这些数据可以让性能得以提高.
*  但是,这样会违背Hadoop中MapReduceApi的一个更为微妙的"约定",即对OutputCollector.collect(K k,V v)
*  的调用一定不会改变k和v的内容.
*  Map1调用OutputCollector.collect(K k,V v)之后,可以继续使用对象k和v,并完全相信他们的值会保持不变.
*  但如果我们将这些对象通过引用传递给Map2,接下来Map2可能会改变他们,这就违反了API的"约定".
*  如果你确信Map1的map()方法在调用OutputCollector.collect(K k,V v)之后不再使用k和v的内容,
*  或者Map2并不改变k和v的在其上的输入值,你可以通过设定byValue为false来获得一定的性能提升.
*  如果你对Mapper的内部代码不太了解,安全起见最好设byValue为true,依旧采用值传递模式,
*  确保mapper会按预期的方式工作.
*
*/
JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job,
Map1.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
map1Conf);
/**
* 在作业中添加Map2阶段
* 使用ChainMapper.addMapper()添加位于Reduce之前的所有步骤
*/
JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job,
Map2.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
true,
map2Conf);

/**
* 在作业中添加Reduce阶段
* 使用静态的ChainReducer.setReducer()方法设置reducer
*/
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job,
Reduce.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
reduceConf);
/**
* 在作业中添加Map3阶段
* 使用ChainReducer.addMapper()添加reducer后续的步骤
*/
JobConf map3Conf = new JobConf(false);
ChainReducer.addMapper(job,
Map3.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
true,
map3Conf);
/**
* 在作业中添加Map4阶段
* 使用ChainReducer.addMapper()添加reducer后续的步骤
*/
JobConf map4Conf = new JobConf(false);
ChainReducer.addMapper(job,
Map4.class,
LongWritable.class,
Text.class,
LongWritable.class,
Text.class,
true,
map4Conf);

JobClient.runJob(job);

return 0;
}

public static void main(String[] args) throws Exception {
final String inputPath = "/home/dev/hadooptest/mapin/cite";
final String outputPath = "/home/dev/hadooptest/mapin/cite/out";
String[] paths = { inputPath, outputPath };

/**
*  Driver中的main函数->ToolRunner中的run函数->Too接口中的run函数->
*  Driver中覆盖函数处理参数->Driver中核心函数启动job(合并为一个方法,重写了接口Tool的run方法)
*/
int res = ToolRunner.run(new Configuration(), new MyJobLink(), paths);

System.exit(res);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: