[Hadoop] Hadoop 链式任务 : ChainMapper and ChainReducer的使用
2015-12-26 13:11
489 查看
注意:
1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。
2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
比如:
Map1 -> Map2 -> Reducer -> Map3 -> Map4
(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)
任务介绍:
这个任务需要两步完成:
1. 对一篇文章进行WordCount
2. 统计出现次数超过5词的单词
WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:
package hadoop_in_action_exersice;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class ChainedJobs {
public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public static final int LOW_LIMIT = 5;
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer st = new StringTokenizer(line);
while(st.hasMoreTokens())
output.collect(new Text(st.nextToken()), one);
}
}
public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while(values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(ChainedJobs.class);
conf.setJobName("wordcount"); //设置一个用户定义的job名称
conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
conf.setOutputValueClass(IntWritable.class); //为job输出设置value类
conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类
conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类
conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类
conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类
conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类
// Remove output folder before run job(s)
FileSystem fs=FileSystem.get(conf);
String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
Path op=new Path(outputPath);
if (fs.exists(op)) {
fs.delete(op, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf); //运行一个job
}
}
上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
为了方便理解,上面的输入的例子如下:
Java代码
accessed 3
accessible 4
accomplish 1
accounting 7
accurately 1
acquire 1
across 1
actual 1
actually 1
add 3
added 2
addition 1
additional 4
old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X
新的API会方便简洁很多
下面是增加了一个Mapper 来过滤
Java代码
public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
@Override
public void map(Text key, IntWritable value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
if(value.get() >= LOW_LIMIT) {
output.collect(key, value);
}
}
}
这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
所以,目前为止,任务链如下:
TokenizerMapper -> TokenizeReducer -> RangeFilterMapper
所以我们的main函数改成下面的样子:
Java代码
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(ChainedJobs.class);
conf.setJobName("wordcount"); //设置一个用户定义的job名称
// conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
// conf.setOutputValueClass(IntWritable.class); //为job输出设置value类
// conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类
// conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类
// conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类
// conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类
// conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类
// Step1 : mapper forr word count
JobConf wordCountMapper = new JobConf(false);
ChainMapper.addMapper(conf,
TokenizeMapper.class,
LongWritable.class, // input key type
Text.class, // input value type
Text.class, // output key type
IntWritable.class, // output value type
false, //byValue or byRefference 传值还是传引用
wordCountMapper);
// Step2: reducer for word count
JobConf wordCountReducer = new JobConf(false);
ChainReducer.setReducer(conf,
TokenizeReducer.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
wordCountReducer);
// Step3: mapper used as filter
JobConf rangeFilterMapper = new JobConf(false);
ChainReducer.addMapper(conf,
RangeFilterMapper.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
rangeFilterMapper);
// Remove output folder before run job(s)
FileSystem fs=FileSystem.get(conf);
String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
Path op=new Path(outputPath);
if (fs.exists(op)) {
fs.delete(op, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf); //运行一个job
}
下面是运行结果的一部分:
Java代码
a 40
and 26
are 12
as 6
be 7
been 8
but 5
by 5
can 12
change 5
data 5
files 7
for 28
from 5
has 7
have 8
if 6
in 27
is 16
it 13
more 8
not 5
of 23
on 5
outputs 5
see 6
so 11
that 11
the 54
1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。
2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
比如:
Map1 -> Map2 -> Reducer -> Map3 -> Map4
(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)
任务介绍:
这个任务需要两步完成:
1. 对一篇文章进行WordCount
2. 统计出现次数超过5词的单词
WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:
package hadoop_in_action_exersice;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class ChainedJobs {
public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public static final int LOW_LIMIT = 5;
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer st = new StringTokenizer(line);
while(st.hasMoreTokens())
output.collect(new Text(st.nextToken()), one);
}
}
public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while(values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(ChainedJobs.class);
conf.setJobName("wordcount"); //设置一个用户定义的job名称
conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
conf.setOutputValueClass(IntWritable.class); //为job输出设置value类
conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类
conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类
conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类
conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类
conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类
// Remove output folder before run job(s)
FileSystem fs=FileSystem.get(conf);
String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
Path op=new Path(outputPath);
if (fs.exists(op)) {
fs.delete(op, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf); //运行一个job
}
}
上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
为了方便理解,上面的输入的例子如下:
Java代码
accessed 3
accessible 4
accomplish 1
accounting 7
accurately 1
acquire 1
across 1
actual 1
actually 1
add 3
added 2
addition 1
additional 4
old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X
新的API会方便简洁很多
下面是增加了一个Mapper 来过滤
Java代码
public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
@Override
public void map(Text key, IntWritable value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
if(value.get() >= LOW_LIMIT) {
output.collect(key, value);
}
}
}
这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
所以,目前为止,任务链如下:
TokenizerMapper -> TokenizeReducer -> RangeFilterMapper
所以我们的main函数改成下面的样子:
Java代码
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(ChainedJobs.class);
conf.setJobName("wordcount"); //设置一个用户定义的job名称
// conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
// conf.setOutputValueClass(IntWritable.class); //为job输出设置value类
// conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类
// conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类
// conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类
// conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类
// conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类
// Step1 : mapper forr word count
JobConf wordCountMapper = new JobConf(false);
ChainMapper.addMapper(conf,
TokenizeMapper.class,
LongWritable.class, // input key type
Text.class, // input value type
Text.class, // output key type
IntWritable.class, // output value type
false, //byValue or byRefference 传值还是传引用
wordCountMapper);
// Step2: reducer for word count
JobConf wordCountReducer = new JobConf(false);
ChainReducer.setReducer(conf,
TokenizeReducer.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
wordCountReducer);
// Step3: mapper used as filter
JobConf rangeFilterMapper = new JobConf(false);
ChainReducer.addMapper(conf,
RangeFilterMapper.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
rangeFilterMapper);
// Remove output folder before run job(s)
FileSystem fs=FileSystem.get(conf);
String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
Path op=new Path(outputPath);
if (fs.exists(op)) {
fs.delete(op, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf); //运行一个job
}
下面是运行结果的一部分:
Java代码
a 40
and 26
are 12
as 6
be 7
been 8
but 5
by 5
can 12
change 5
data 5
files 7
for 28
from 5
has 7
have 8
if 6
in 27
is 16
it 13
more 8
not 5
of 23
on 5
outputs 5
see 6
so 11
that 11
the 54
相关文章推荐
- 【整理】Android中EditText中的InputType类型含义与如何定义
- 【android】仿360手机卫士的简易设计思路及源码
- 如何把应用程序app编译进android系统
- iOS开源项目汇总
- 11.Android之常用对话框AlertDialog学习
- [Swift]:快速学习笔记3 类和结构体
- android原生browser分析(一)--Application
- Android编程开发之NotiFication用法详解
- iOS crash log 解析 symbol address = stack address - slide 运行时获取slide的api 利用dwarfdump从dsym文件中得到symbol
- Android 二级串联菜单的实现过程
- iOS高德地图的自动化配置
- Android出现“Read-only file system”解决办法
- Cocos2d-x3.0 Button
- Android ViewGroup测量child过程
- iOS学习之二维码扫描
- Android基于API的Tabs3实现仿优酷tabhost效果实例
- iOS 运行时添加属性和方法
- Android 自己来尝试性解读《Android照片墙完整版,完美结合LruCache和DiskLruCache》
- 【java微信开发】环境搭建
- Android编程开发实现TextView显示表情图像和文字的方法