MapReduce实现倒排索引全排序
2017-05-30 10:32
531 查看
本文主要介绍如何利用Hadoop自带的TotalOrderPartitionner进行全排序。本文通过实现一个带词频属性的倒排索引,并统计每个词语的平均出现次数,使我们能够根据每个词语的平均出现次数来对词语进行全排序。
平均出现次数=词语在全部文档的出现频数/词语出现的文档数
(本文假设机器已经安装好Hadoop,可以参考我博客的另一篇文章)
Job1 Map阶段结果:(word, doc:1)
这样在数据传到Reduce阶段的时候,只需统计同一个Key(词语)的Value数量即可得到词语的出现总次数,同时可以用Set集合或者用Map加上自行判断即可统计出Value中的文档数,将词语总出现次数除以词语出现的文档数即可得到平均出现次数。
Job1 Reduce阶段结果:(word, (average_count; doc1:N, doc2:M…))
本文通过另一个Job来实现全排序。实现全局排序的关键在于Partition的实现,因为MapReduce自身在Sort阶段是可以自动进行排序的,但是关键在于如何在集群多台机器上进行合理的数据分发,使得Reduce阶段各机器只需对分发到的数据进行排序,最后归并即可得到全局排序的数据。
为了方便排序,可以先在之前一个Job的Reduce阶段输出过程中,将词语平均出现次数暂时输出到行首(也就是Key),其他的原样输出到Value。
Job2 Map阶段结果:(average_count, (word, (…)))
同时本程序直接使用了Hadoop自带的TotalOrderPartitioner类,该类需要使用采样器才可较好地工作,所以本程序使用了RandomSampler类。但是由于RandomSampler要求InputFormat的Key和Map阶段输出时使用的Key是同一类型,因而将第一个Job的OutputFormatClass设置为了SequenceFileOutputFormat类,并同时将第二个Job的InputFormatClass设置为了SequenceFileInputFormat类。
Job2 Reduce阶段结果:(word, (average_count; doc1:N, doc2:M…))
注:因为为这两个Job的Map阶段和最后输出阶段(Reduce)使用的OutputKey类型都不一样,所以需要在Job明确指明相应的类型。
第一个Job的Mapper:InvertedIndexMapper.java
第一个Job的Reducer:InvertedIndexReducer.java
第二个Job的Mapper:IndexSortMapper.java(该类没什么特殊的,仅作输入用)
第二个Job的Reducer:IndexSortReducer.java
最后可以通过打包成jar包放在安装了Hadoop的集群上(本地目录),然后通过在jar包目录运行”hadoop jar …”即可运行本程序:
jar包的运行方式:
示例:
平均出现次数=词语在全部文档的出现频数/词语出现的文档数
(本文假设机器已经安装好Hadoop,可以参考我博客的另一篇文章)
MapReduce设计思路
首先实现“带词频属性的文档倒排算法”,同时还要统计每个词语的平均出现次数。具体实现时,可以在Map阶段通过扫描行的过程中为每个词语生成一个二元组:Job1 Map阶段结果:(word, doc:1)
这样在数据传到Reduce阶段的时候,只需统计同一个Key(词语)的Value数量即可得到词语的出现总次数,同时可以用Set集合或者用Map加上自行判断即可统计出Value中的文档数,将词语总出现次数除以词语出现的文档数即可得到平均出现次数。
Job1 Reduce阶段结果:(word, (average_count; doc1:N, doc2:M…))
本文通过另一个Job来实现全排序。实现全局排序的关键在于Partition的实现,因为MapReduce自身在Sort阶段是可以自动进行排序的,但是关键在于如何在集群多台机器上进行合理的数据分发,使得Reduce阶段各机器只需对分发到的数据进行排序,最后归并即可得到全局排序的数据。
为了方便排序,可以先在之前一个Job的Reduce阶段输出过程中,将词语平均出现次数暂时输出到行首(也就是Key),其他的原样输出到Value。
Job2 Map阶段结果:(average_count, (word, (…)))
同时本程序直接使用了Hadoop自带的TotalOrderPartitioner类,该类需要使用采样器才可较好地工作,所以本程序使用了RandomSampler类。但是由于RandomSampler要求InputFormat的Key和Map阶段输出时使用的Key是同一类型,因而将第一个Job的OutputFormatClass设置为了SequenceFileOutputFormat类,并同时将第二个Job的InputFormatClass设置为了SequenceFileInputFormat类。
Job2 Reduce阶段结果:(word, (average_count; doc1:N, doc2:M…))
注:因为为这两个Job的Map阶段和最后输出阶段(Reduce)使用的OutputKey类型都不一样,所以需要在Job明确指明相应的类型。
代码实现
程序入口:InvertedIndexer.java (程序已经添加注释,方便读者理解)public class InvertedIndexer { public static void main(String[] args) { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Path tmpPath = new Path(args[1]+"_tmp"); Path partitionFile = new Path(args[1]+"_partition_file"); int reduceNum = Integer.parseInt(args[2]); try { Configuration conf = new Configuration(); InputSampler.RandomSampler<Text,Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.1,1000,10); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); Job job = Job.getInstance(conf,"invert index"); job.setJarByClass(InvertedIndexer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(InvertedIndexMapper.class); job.setReducerClass(InvertedIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(DoubleWritable.class); job.setOutputValueClass(Text.class); //HDFS输入路径 FileInputFormat.addInputPath(job,inputPath); //HDFS输出路径 //MapReduce中不允许已存在的输出路径,因而先删除 //tmpPath.getFileSystem(conf).delete(tmpPath, true); FileOutputFormat.setOutputPath(job,tmpPath); if(job.waitForCompletion(true)){ Job sortJob = Job.getInstance(conf,"index sort"); sortJob.setJarByClass(InvertedIndexer.class); sortJob.setInputFormatClass(SequenceFileInputFormat.class); sortJob.setMapperClass(IndexSortMapper.class); sortJob.setPartitionerClass(TotalOrderPartitioner.class); sortJob.setReducerClass(IndexSortReducer.class); sortJob.setMapOutputKeyClass(DoubleWritable.class); sortJob.setMapOutputValueClass(Text.class); sortJob.setOutputKeyClass(Text.class); sortJob.setOutputValueClass(Text.class); sortJob.setNumReduceTasks(reduceNum); FileInputFormat.addInputPath(sortJob,tmpPath); outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(sortJob,outputPath); InputSampler.writePartitionFile(sortJob, sampler); System.exit(sortJob.waitForCompletion(true)?0:1); } System.exit(1); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }
第一个Job的Mapper:InvertedIndexMapper.java
public class InvertedIndexMapper extends Mapper<Object,Text,Text,Text> { private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); //仅截取文件(小说)的名字,去掉扩展格式符 String docName = (fileSplit.getPath().getName().split("\\."))[0]; String[] words = value.toString().split("\\s"); //分割词语 for(int i=0;i<words.length;i++){ word.set(words[i]); //按照(词语,文档名:1)输出传递给Reduce context.write(word,new Text(docName+":"+1)); } } }
第一个Job的Reducer:InvertedIndexReducer.java
public class InvertedIndexReducer extends Reducer<Text,Text,DoubleWritable,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String,Integer> map = new HashMap<String,Integer>(); Iterator<Text> it = values.iterator(); //item[0]用于保存词语,itemp[1]用于保存频数(Map传递过来的一般是1) String[] item; if(it.hasNext()){ item = it.next().toString().split(":"); map.put(item[0],1); } while(it.hasNext()){ item = it.next().toString().split(":"); if(map.containsKey(item[0])){ map.put(item[0],((int)map.get(item[0]))+1); }else{ map.put(item[0],1); } } //计算词语平均出现次数 Iterator<Integer> valueIterator = map.values().iterator(); long count = 0L; double average = 0L; while(valueIterator.hasNext()){ c 4000 ount += valueIterator.next(); } average = (double)count/map.size(); //Reduce输出(平均出现次数,词语\t平均出现次数,文档名:N;...) StringBuilder docIndex = new StringBuilder(); Iterator<Map.Entry<String,Integer>> entryIterator =map.entrySet().iterator(); Map.Entry<String,Integer> entry; if(entryIterator.hasNext()){ entry = entryIterator.next(); docIndex.append(entry.getKey()).append(":").append(entry.getValue()); } while(entryIterator.hasNext()){ docIndex.append(";"); entry = entryIterator.next(); docIndex.append(entry.getKey()).append(":").append(entry.getValue()); } context.write(new DoubleWritable(average),new Text(key.toString() + "\t" + average+","+docIndex.toString())); } }
第二个Job的Mapper:IndexSortMapper.java(该类没什么特殊的,仅作输入用)
public class IndexSortMapper extends Mapper<DoubleWritable,Text,DoubleWritable,Text> { @Override protected void map(DoubleWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key,value); } }
第二个Job的Reducer:IndexSortReducer.java
public class IndexSortReducer extends Reducer<DoubleWritable,Text,Text,Text> { @Override protected void reduce(DoubleWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> it = values.iterator(); Text newValue = new Text(); Text newKey = new Text(); String[] strs; while (it.hasNext()){ //鉴于传到Reduce阶段的时候数据已经排好序了,因而直接将Key(平均出现次数)去掉 //保留原来的Value并还原成(词语,平均出现次数,文档名:N;...) strs = it.next().toString().split("\\t"); newKey.set(strs[0]); newValue.set(strs[1]); context.write(newKey,newValue); } } }
最后可以通过打包成jar包放在安装了Hadoop的集群上(本地目录),然后通过在jar包目录运行”hadoop jar …”即可运行本程序:
jar包的运行方式:
hadoop jar InvertedIndexer.jar {hdfs-in} {hdfs-out} {reduce-num}
示例:
hadoop jar InvertedIndexer.jar /in /out 5
相关文章推荐
- mapreduce实现对key的排序
- mapreduce (三) MapReduce实现倒排索引(二)
- mapreduce实现对key的排序
- MapReduce之三—搜索引擎-倒排索引实现
- mapreduce实现排序并且找出销量最多的数据
- topk在mapreduce下面的统计加排序的实现
- Mapreduce---RandomSampler采样实现全排序
- 利用采样器实现mapreduce任务输出全排序
- mapreduce实现流量汇总排序程序
- Hadoop1.x MapReduce 实现二次排序 实现WritableComparable接口
- MapReduce实现排序功能
- mapreduce实现倒排索引
- 2018-07-28期 MapReduce实现对数字排序
- mapreduce 利用InverseMapper.class对key,value进行 交换实现词频排序
- MapReduce实现手机上网日志分析(排序)
- MapReduce实现排序功能
- MapReduce实现倒排序索引
- Hadoop2.7.3 mapreduce(四)倒排索引的实现
- MapReduce实现分组排序
- MapReduce实现倒排索引