您的位置:首页 > 其它

一些算法的MapReduce实现——倒排索引实现

2014-01-05 14:52 921 查看

Introduce to Inverted List

倒排索引(英语:Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。

有两种不同的反向索引形式:

一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表。
一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。

后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建。

英文为例,下面是要被索引的文本:


"it
 is what it is"



"what
 is it"



"it
 is a banana"


我们就能得到下面的反向文件索引:
"a":      		{2}
 "banana": 	{2}
 "is":     		{0, 1, 2}
 "it":     		{0, 1, 2}
 "what":   		{0, 1}

后面的编号对应的就是文档号,例如,“a”只在T2中出现

其实倒排列表还可以存储额外的信息,像tf(term freq),df(doc freq),postition(term posititon),例如如下所示额外存储了term在文档中的位置

"a":      		{(2, 2)}
"banana": 	{(2, 3)}
"is":     		{(0, 1), (0, 4), (1, 1), (2, 1)}
"it":     		{(0, 0), (0, 3), (1, 2), (2, 0)} 
"what":   		{(0, 2), (1, 0)}

详细的倒排索引知识请见参考[1]的文章。

MapReduce Implementation

实现一

下面我将实现的倒排列表时带有term freq信息的。MapReduce实现很简单,最最简单的伪代码实现如下:

class MAPPER
	procedure MAP(docid n, doc d)
		for all term t in doc d
			EMIT(term t, <n, 1>)
			
class REDUCER
	method INITIALIZE
		t(pre) <-- null
	procedure REDUCER(term t, postings[<docid n1, tf1>,<docid n2, tf2>....])
		P <-- new ASSOCIATIVE_SORTED_MAP
		if t(pre) != t AND t(pre) != null
			EMIT(t, P)
			P.RESET
		for all posting<n, tf> in postings[....]
			P{n, tf} = P{n, tf++};
	method CLOSE
		EMIT(term t, P)


以上实现中,Mapper每次都提交一个三元组<term t, docid n, 1>,然后在Reducer端,在对相同的term进行每个doc 的tf统计,最后提交。方法确实很简单,基本上所有的工作都在Reducer端完成,Mapper端只是分解doc提交而已,如果一篇文章很长,每个term出现的次数又比较多,那么这样导致mapper提交次数过多,生成的中间结果过多,网络传输就会很频繁,从来在shuffle和sort阶段很费时费力。实现二就简单简单解决了这个问题

实现二



该实现在mapper端词频做了统计,然后再提交各Reducer,这样就大大减少了mapper的提交次数。这样还是存在问题的。
基本的MapReduce执行框架不能保证Mapper端提交的value被发送到reducer的排序问题,就是说,不能保证在Reducer端相同的key所对应的value列表的值是有序的。reducer端对于相同的key必须缓存其值列表。然后再在内存进行排序,最后在写到磁盘,这样很有可能会导致reducer端out-of-memory 。

一个比较好的解决办法是让MapReduce框架帮我们做value的排序工作,以此来保证传递给reducer的相同的key所对应的values列表值是有序的。在Mapper端用提交以下形式来代替原先的de

(tuple <term t, docis n>, tf)

这是一种比较典型的MapReduce value-to-key转换的设计模型,这样的话,在Reducer端缓存的postings列表的内存使用率将大大降低,从而减少out-of-memory发生,具体实现如下伪代码:



Code

/**
   * input format
   *    docid<tab>doc content
   *    
   * output format
   *    (term:docid)<tab>(tf in this doc)
   *
   */
  public static class InvertedListMap extends Mapper<IntWritable/*docid*/, Text/*doc content*/, Text, IntWritable> {

    @Override
    protected void map(IntWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      HashMap<Text, IntWritable> freqs = new HashMap<Text, IntWritable> ();
      
      // the document can be preprocessed by thrid analynized tool first
      // here is simplify this procedure using split by whitespace instead
      String[] terms = value.toString().split(" "); 
      for (String term : terms ) {
        
        if (term == null || "".equals(term)) continue;
        
        if (freqs.containsKey(new Text(term))) {
          int tf = freqs.get(new Text(term)).get();
          freqs.put(new Text(term), new IntWritable(tf + 1));
        } else {
          freqs.put(new Text(term), new IntWritable(1));
        }
      } // end of for loop
      
      Iterator<Map.Entry<Text, IntWritable>> entryIter = (Iterator<Entry<Text, IntWritable>>) freqs.entrySet().iterator();
      while (entryIter.hasNext()) {
        Map.Entry<Text, IntWritable> entry = entryIter.next();
        Text tuple = new Text();
        tuple.set(entry.getKey().toString() + ":" + key.toString());
        context.write(tuple, freqs.get(entry.getKey()));
      }
    }
    
  }
  
  public static class InvertedListReduce extends Reducer <Text, IntWritable, Text, Map<IntWritable, IntWritable>> {

    private String term = null;
    private Map<IntWritable, IntWritable> posting = null;
    
    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      term = null;
      posting = new TreeMap<IntWritable, IntWritable>();
    }

    @Override
    protected void cleanup(Context context)
        throws IOException, InterruptedException {
      context.write(new Text(term), posting);
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      String[] tuple = key.toString().split(":");
      if (term != null && !term.equalsIgnoreCase(tuple[0])) {
        context.write(new Text(tuple[0]), posting);
        posting.clear();
      } else {
        for (IntWritable val : values) {
          posting.put(new IntWritable(Integer.parseInt(tuple[1])), val);
        }
        
        term = key.toString();
      }
    }
    
  }


今天就到这了。关于如何优化Map和Reduce,Google MapReduce的 Pairs 和 Stripes优化过程[3]

后续考虑一下这个MapReduce倒排索引同时进行索引压缩存储。

Reference

1、http://blog.csdn.net/hguisu/article/details/7962350

2、2010 - Jimmy Lin - Data-Intensive Text Processing with MapReduce

3、http://chandramanitiwary.wordpress.com/2012/08/19/map-reduce-design-patterns-pairs-stripes/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: