您的位置:首页 > 运维架构

MapReduce实现倒排索引全排序

2017-05-30 10:32 531 查看
本文主要介绍如何利用Hadoop自带的TotalOrderPartitionner进行全排序。本文通过实现一个带词频属性的倒排索引,并统计每个词语的平均出现次数,使我们能够根据每个词语的平均出现次数来对词语进行全排序。

平均出现次数=词语在全部文档的出现频数/词语出现的文档数

(本文假设机器已经安装好Hadoop,可以参考我博客的另一篇文章)

MapReduce设计思路

首先实现“带词频属性的文档倒排算法”,同时还要统计每个词语的平均出现次数。具体实现时,可以在Map阶段通过扫描行的过程中为每个词语生成一个二元组:

Job1 Map阶段结果:(word, doc:1)

这样在数据传到Reduce阶段的时候,只需统计同一个Key(词语)的Value数量即可得到词语的出现总次数,同时可以用Set集合或者用Map加上自行判断即可统计出Value中的文档数,将词语总出现次数除以词语出现的文档数即可得到平均出现次数。

Job1 Reduce阶段结果:(word, (average_count; doc1:N, doc2:M…))

本文通过另一个Job来实现全排序。实现全局排序的关键在于Partition的实现,因为MapReduce自身在Sort阶段是可以自动进行排序的,但是关键在于如何在集群多台机器上进行合理的数据分发,使得Reduce阶段各机器只需对分发到的数据进行排序,最后归并即可得到全局排序的数据。

为了方便排序,可以先在之前一个Job的Reduce阶段输出过程中,将词语平均出现次数暂时输出到行首(也就是Key),其他的原样输出到Value。

Job2 Map阶段结果:(average_count, (word, (…)))

同时本程序直接使用了Hadoop自带的TotalOrderPartitioner类,该类需要使用采样器才可较好地工作,所以本程序使用了RandomSampler类。但是由于RandomSampler要求InputFormat的Key和Map阶段输出时使用的Key是同一类型,因而将第一个Job的OutputFormatClass设置为了SequenceFileOutputFormat类,并同时将第二个Job的InputFormatClass设置为了SequenceFileInputFormat类。

Job2 Reduce阶段结果:(word, (average_count; doc1:N, doc2:M…))

注:因为为这两个Job的Map阶段和最后输出阶段(Reduce)使用的OutputKey类型都不一样,所以需要在Job明确指明相应的类型。

代码实现

程序入口:InvertedIndexer.java (程序已经添加注释,方便读者理解)

public class InvertedIndexer {
public static void main(String[] args) {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Path tmpPath = new Path(args[1]+"_tmp");
Path partitionFile = new Path(args[1]+"_partition_file");
int reduceNum = Integer.parseInt(args[2]);

try {
Configuration conf = new Configuration();
InputSampler.RandomSampler<Text,Text> sampler =
new InputSampler.RandomSampler<Text,Text>(0.1,1000,10);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

Job job = Job.getInstance(conf,"invert index");
job.setJarByClass(InvertedIndexer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
//HDFS输入路径
FileInputFormat.addInputPath(job,inputPath);
//HDFS输出路径
//MapReduce中不允许已存在的输出路径,因而先删除
//tmpPath.getFileSystem(conf).delete(tmpPath, true);
FileOutputFormat.setOutputPath(job,tmpPath);

if(job.waitForCompletion(true)){
Job sortJob = Job.getInstance(conf,"index sort");
sortJob.setJarByClass(InvertedIndexer.class);
sortJob.setInputFormatClass(SequenceFileInputFormat.class);
sortJob.setMapperClass(IndexSortMapper.class);
sortJob.setPartitionerClass(TotalOrderPartitioner.class);
sortJob.setReducerClass(IndexSortReducer.class);
sortJob.setMapOutputKeyClass(DoubleWritable.class);
sortJob.setMapOutputValueClass(Text.class);
sortJob.setOutputKeyClass(Text.class);
sortJob.setOutputValueClass(Text.class);
sortJob.setNumReduceTasks(reduceNum);

FileInputFormat.addInputPath(sortJob,tmpPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileOutputFormat.setOutputPath(sortJob,outputPath);
InputSampler.writePartitionFile(sortJob, sampler);

System.exit(sortJob.waitForCompletion(true)?0:1);
}
System.exit(1);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}


第一个Job的Mapper:InvertedIndexMapper.java

public class InvertedIndexMapper extends Mapper<Object,Text,Text,Text> {
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
//仅截取文件(小说)的名字,去掉扩展格式符
String docName = (fileSplit.getPath().getName().split("\\."))[0];
String[] words = value.toString().split("\\s"); //分割词语

for(int i=0;i<words.length;i++){
word.set(words[i]);
//按照(词语,文档名:1)输出传递给Reduce
context.write(word,new Text(docName+":"+1));
}
}
}


第一个Job的Reducer:InvertedIndexReducer.java

public class InvertedIndexReducer extends Reducer<Text,Text,DoubleWritable,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String,Integer> map = new HashMap<String,Integer>();
Iterator<Text> it = values.iterator();

//item[0]用于保存词语,itemp[1]用于保存频数(Map传递过来的一般是1)
String[] item;
if(it.hasNext()){
item = it.next().toString().split(":");
map.put(item[0],1);
}
while(it.hasNext()){
item = it.next().toString().split(":");
if(map.containsKey(item[0])){
map.put(item[0],((int)map.get(item[0]))+1);
}else{
map.put(item[0],1);
}
}

//计算词语平均出现次数
Iterator<Integer> valueIterator = map.values().iterator();
long count = 0L;
double average = 0L;
while(valueIterator.hasNext()){
c
4000
ount += valueIterator.next();
}
average = (double)count/map.size();

//Reduce输出(平均出现次数,词语\t平均出现次数,文档名:N;...)
StringBuilder docIndex = new StringBuilder();
Iterator<Map.Entry<String,Integer>> entryIterator =map.entrySet().iterator();
Map.Entry<String,Integer> entry;
if(entryIterator.hasNext()){
entry = entryIterator.next();
docIndex.append(entry.getKey()).append(":").append(entry.getValue());
}
while(entryIterator.hasNext()){
docIndex.append(";");
entry = entryIterator.next();
docIndex.append(entry.getKey()).append(":").append(entry.getValue());
}
context.write(new DoubleWritable(average),new Text(key.toString() + "\t" + average+","+docIndex.toString()));
}
}


第二个Job的Mapper:IndexSortMapper.java(该类没什么特殊的,仅作输入用)

public class IndexSortMapper extends Mapper<DoubleWritable,Text,DoubleWritable,Text> {
@Override
protected void map(DoubleWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}


第二个Job的Reducer:IndexSortReducer.java

public class IndexSortReducer extends Reducer<DoubleWritable,Text,Text,Text> {
@Override
protected void reduce(DoubleWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> it = values.iterator();
Text newValue = new Text();
Text newKey = new Text();
String[] strs;
while (it.hasNext()){
//鉴于传到Reduce阶段的时候数据已经排好序了,因而直接将Key(平均出现次数)去掉
//保留原来的Value并还原成(词语,平均出现次数,文档名:N;...)
strs = it.next().toString().split("\\t");
newKey.set(strs[0]);
newValue.set(strs[1]);
context.write(newKey,newValue);
}
}
}


最后可以通过打包成jar包放在安装了Hadoop的集群上(本地目录),然后通过在jar包目录运行”hadoop jar …”即可运行本程序:

jar包的运行方式:

hadoop jar InvertedIndexer.jar {hdfs-in} {hdfs-out} {reduce-num}


示例:

hadoop jar InvertedIndexer.jar /in /out 5
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息