Hadoop之MapReduce-倒排索引案例
2015-05-27 18:28
344 查看
一、问题描述
统计每个单词在各个文件中出现的次数。
二、所给数据
1. 输入数据:
a.txt: b.txt
hello tom hello jerry
hello jerry hello tom
hello kitty hello world
hello world
hello tom
2.输出数据
hello a.txt->5 b.txt->3
jerry a.txt->1 b.txt->1
tom a.txt->1 b.txt->2
world a.txt->1 b.txt->1
kitty b.txt->1
三、问题思路(在伪分布式模式下)
1.map阶段
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.wtite("hello->b.txt",1)
context.wtite("hello->b.txt",1)
context.wtite("hello->b.txt",1)
2.combiner阶段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.wtite("hello",'a.txt->5")
context.wtite("hello",'b.txt->3")
3. Reducer阶段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5 b.txt->3");
四、代码实现
类 InverseIndex
package edu.jianwei.hadoop.mr.ii;
import java.io.IOException;
public class InverseIndex {
static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
private final Text k = new Text();
private final Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String path = inputSplit.getPath().toString();
for (String word : words) {
k.set(word + "->" + path);
v.set("1");
context.write(k, v);
}
}
}
static class IndexCombiner extends Reducer<Text, Text, Text, Text> {
private final Text key = new Text();
private final Text value = new Text();
@Override
protected void reduce(Text k, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
String line = k.toString();
String[] wordAndpath = line.split("->");
key.set(wordAndpath[0]);
int counter = 0;
for (Text v : v2s) {
counter += Integer.parseInt(v.toString());
}
value.set(wordAndpath[1] + "->" + counter);
context.write(key, value);
}
}
static class IndexReducer extends Reducer<Text, Text, Text, Text> {
private final Text v = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String value = "";
for (Text v : values) {
value += v.toString() + " ";
}
v.set(value);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InverseIndex.class);
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setCombinerClass(IndexCombiner.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
五、代码测试
1.代码运行
hadoop jar /root/ii.jar edu.jianwei.hadoop.mr.ii.InverseIndex /inverseIndex /inverseIndex/res
2.输出结果
统计每个单词在各个文件中出现的次数。
二、所给数据
1. 输入数据:
a.txt: b.txt
hello tom hello jerry
hello jerry hello tom
hello kitty hello world
hello world
hello tom
2.输出数据
hello a.txt->5 b.txt->3
jerry a.txt->1 b.txt->1
tom a.txt->1 b.txt->2
world a.txt->1 b.txt->1
kitty b.txt->1
三、问题思路(在伪分布式模式下)
1.map阶段
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.wtite("hello->b.txt",1)
context.wtite("hello->b.txt",1)
context.wtite("hello->b.txt",1)
2.combiner阶段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.wtite("hello",'a.txt->5")
context.wtite("hello",'b.txt->3")
3. Reducer阶段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5 b.txt->3");
四、代码实现
类 InverseIndex
package edu.jianwei.hadoop.mr.ii;
import java.io.IOException;
public class InverseIndex {
static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
private final Text k = new Text();
private final Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String path = inputSplit.getPath().toString();
for (String word : words) {
k.set(word + "->" + path);
v.set("1");
context.write(k, v);
}
}
}
static class IndexCombiner extends Reducer<Text, Text, Text, Text> {
private final Text key = new Text();
private final Text value = new Text();
@Override
protected void reduce(Text k, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
String line = k.toString();
String[] wordAndpath = line.split("->");
key.set(wordAndpath[0]);
int counter = 0;
for (Text v : v2s) {
counter += Integer.parseInt(v.toString());
}
value.set(wordAndpath[1] + "->" + counter);
context.write(key, value);
}
}
static class IndexReducer extends Reducer<Text, Text, Text, Text> {
private final Text v = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String value = "";
for (Text v : values) {
value += v.toString() + " ";
}
v.set(value);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InverseIndex.class);
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setCombinerClass(IndexCombiner.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
五、代码测试
1.代码运行
hadoop jar /root/ii.jar edu.jianwei.hadoop.mr.ii.InverseIndex /inverseIndex /inverseIndex/res
2.输出结果
相关文章推荐
- hadoop2.5.2学习07--MapReduce应用案例2
- Hadoop集群(第9期)_MapReduce初级案例
- Hadoop集群_MapReduce初级案例
- Hadoop集群_MapReduce初级案例
- hadoop学习第六节:MapReduce应用案例
- hadoop的mapreduce常见算法案例有几种
- Hadoop环境搭建之二配置启动HDFS及本地模式运行MapReduce案例(使用HDFS上数据)
- Hadoop集群MapReduce经典案例
- Hadoop链式MapReduce、多维排序、倒排索引、自连接算法、二次排序、Join性能优化、处理员工信息Join实战、URL流量分析、TopN及其排序、求平均值和最大最小值、数据清洗ETL、分析气
- hadoop编程(5)-MapReduce案例:通过MinimalMapReduce进一步了解MR的机制
- Hadoop MapReduce编程 API入门系列之倒排索引(二十四)
- (2) hadoop 配置部署启动HDFS及本地模式运行MapReduce案例(使用HDFS上数据)
- Hadoop MapReduce编程入门案例
- Hadoop案例之倒排索引
- CentOS系统下的Hadoop集群(第9期)_MapReduce初级案例
- hadoop入门--简单的MapReduce案例
- Hadoop2.7.3 mapreduce(四)倒排索引的实现
- hadoop编程(3)-MapReduce案例:文本去重
- 5.1 MapReduce案例——倒排索引
- Hadoop环境搭建之一安装jdk,hadoop基本配置及运行MapReduce案例在本地模式下