MapReduce 案例之倒排索引
2018-02-09 09:06
120 查看
MapReduce 案例之倒排索引
1. 倒排索引
倒排索引是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。 它主要是用来存储某个单词(或词组) 在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引( Inverted Index)。2. 实例描述
通常情况下,倒排索引由一个单词(或词组)以及相关的文档列表组成,文档列表中的文档或者是标识文档的 ID 号,或者是指文档所在位置的 URL。如下图所示:从上图可以看出,单词 1 出现在{文档 1,文档 5,文档 13, ……}中,单词 2 出现在{文档 2,文档 3,文档 5, ……}中,而单词 3 出现在{文档 2,文档 10,文档 16, ……}中。在实际应用中,还需要给每个文档添加一个权值,用来指出每个文档与搜索内容的相关度,如下图所示:
最常用的是使用词频作为权重,即记录单词在文档中出现的次数。以英文为例,如下图所示,索引文件中的“ MapReduce”一行表示:“ MapReduce”这个单词在文本 T0 中 出现过 1 次,T1 中出现过 1 次,T2 中出现过 2 次。当搜索条件为“ MapReduce”、“ is”、“ Simple” 时,对应的集合为: {T0, T1, T2}∩{T0, T1}∩{T0, T1}={T0, T1},即文档 T0 和 T1 包 含了所要索引的单词,而且只有 T0 是连续的。
3. 设计思路
3.1 Map过程
首先使用默认的 TextInputFormat 类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然, Map 过程首先必须分析输入的key/value对,得到倒排索引中需要的三个信息:单词、文档 URL 和词频,如下图所示。这里存在两个问题:第一, key/value对只能有两个值,在不使用Hadoop 自定义数据类型的情况下,需要根据情况将其中两个值合并成一个值,作为 key 或 value 值;
第二,通过一个 Reduce 过程无法同时完成词频统计和生成文档列表,所以必须增加一个 Combine 过程完成词频统计。
这里将单词和 URL 组成 key 值(如“ MapReduce: file1.txt”),将词频作为value,这样做的好处是可以利用 MapReduce 框架自带的Map 端排序,将同一文档的相同单词的词频组成列表,传递给 Combine 过程,实现类似于 WordCount 的功能。
3.2 Combine 过程
经过 map 方法处理后, Combine 过程将 key 值相同 value 值累加,得到一个单词在文档中的词频。 如果直接将图所示的输出作为 Reduce 过程的输入,在 Shuffle 过程时将面临一个问题:所有具有相同单词的记录(由单词、 URL 和词频组成)应该交由同一个Reducer 处理,但当前的 key 值无法保证这一点,所以必须修改 key 值和 value 值。这次将单词作为 key 值, URL 和词频组成 value 值(如“ file1.txt: 1”)。这样做的好处是可以利用 MapReduce 框架默认的 HashPartitioner 类完成 Shuffle 过程,将相同单词的所有记录发送给同一个 Reducer 进行处理。3.3 Reduce 过程
经过上述两个过程后, Reduce 过程只需将相同 key 值的 value 值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给 MapReduce 框架进行处理了。3.4 程序代码
pom文件<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itcast</groupId> <artifactId>invertedIndex</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>invertedIndex</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.4</version> </depe 4000 ndency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>cn.itcast.hadoop.mrwc.WordCountDriver</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
Map程序
package cn.itcast.hadoop.mr.invertedIndex; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>{ private static Text keyInfo = new Text();// 存储单词和 URL 组合 private static final Text valueInfo = new Text("1");// 存储词频,初始化为1 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" ");// 得到字段数组 FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到这行数据所在的文件切片 String fileName = fileSplit.getPath().getName();// 根据文件切片得到文件名 for (String field : fields) { // key值由单词和URL组成,如“MapReduce:file1” keyInfo.set(field + ":" + fileName); context.write(keyInfo, valueInfo); } } }
combine程序
package cn.itcast.hadoop.mr.invertedIndex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{ private static Text info = new Text(); // 输入: <MapReduce:file3 {1,1,...}> // 输出:<MapReduce file3:2> @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sum = 0;// 统计词频 for (Text value : values) { sum += Integer.parseInt(value.toString()); } int splitIndex = key.toString().indexOf(":"); // 重新设置 value 值由 URL 和词频组成 info.set(key.toString().substring(splitIndex + 1) + ":" + sum); // 重新设置 key 值为单词 key.set(key.toString().substring(0, splitIndex)); context.write(key, info); } }
reduce程序
package cn.itcast.hadoop.mr.invertedIndex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{ private static Text result = new Text(); // 输入:<MapReduce file3:2> // 输出:<MapReduce file1:1;file2:1;file3:2;> @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 生成文档列表 String fileList = new String(); for (Text value : values) { fileList += value.toString() + ";"; } result.set(fileList); context.write(key, result); } }
主程序
package cn.itcast.hadoop.mr.invertedIndex; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InvertedIndexRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InvertedIndexRunner.class); job.setMapperClass(InvertedIndexMapper.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("D:\\ziliao\\data\\InvertedIndex\\input")); // 指定处理完成之后的结果所保存的位置 FileOutputFormat.setOutputPath(job, new Path("D:\\ziliao\\data\\InvertedIndex\\output")); // 向 yarn 集群提交这个 job boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
喜欢就点赞评论+关注吧
感谢阅读,希望能帮助到大家,谢谢大家的支持!
相关文章推荐
- 5.1 MapReduce案例——倒排索引
- 大数据_Shuffle、MapReduce编程案例(数据去重、多表查询、倒排索引、使用单元测试)
- Hadoop之MapReduce-倒排索引案例
- Hbase与Mapreduce集成的案例
- HBase(十一):HBaseAndMapReduce小案例总结
- 一些算法的MapReduce实现——倒排索引实现
- Hadoop 案例13----倒排索引
- Hadoop集群MapReduce经典案例
- MapReduce案例学习(2) 求各个部门的人数和平均工资
- mapreduce实现搜索引擎简单的倒排索引
- HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引
- MapReduce练习二(单表关联,多表关联,倒排索引)
- 案例一:基于MapReduce求每年的最大天气
- java8之Lambda表达式 4:MapReduce开发案例
- Hadoop MapReduce 简单案例--求平均值
- Hadoop MapReduce 简单案例--求素数个数
- mapreduce--倒排索引
- Hadoop之道--MapReduce简单应用倒排索引(InversedIndex)
- MapReduce初级经典案例实现
- MapReduce案例8——求最频繁访问数据表以及最频繁访问的用户和时长