MapReduce--倒排索引

文档倒排算法简介

Inverted Index(倒排索引)是目前几乎所有支持全文检索的搜索引擎都要依赖的一个数据结构。基于索引结构,给出一个词(term),能取得含有这个term的文档列表(the list of documents) 
Web Search中的问题主要分为三部分:

  1. crawling(gathering web content) ,网页爬虫,收集数据
  2. indexing(construction of the inverted index) ,根据大量数据构建倒排索引结构
  3. retrieval(ranking documents given a query),根据一个搜索单词进行索引并对结果进行排序,比如可以根据词频多少来排 
    crawling和indexing都是离线的,retrieval是在线、实时的。 
    此处有个问题,索引结构会如何进行存储呢? 
    给定一个单词,如何快速得到结果呢? 
    一般可以采用两种存储方式,一种是hash链表,还有一种则是B(B+)树。 
    说起存储我就想到我同学面试时被问到一个问题:a~z26个单词如何存储能快速索引? 
    (⊙﹏⊙)b居然是26叉树,好丧心病狂啊!

基本的倒排索引结构

这里写图片描述

实验任务

请实现课堂上介绍的“带词频属性的文档倒排算法”。 
在统计词语的倒排索引时,除了要输出带词频属性的倒排索引,还请计算每个词语的“平均 
提及次数”(定义见下)并输出。 
“平均提及次数”在这里定义为: 
平均提及次数= 词语在全部文档中出现的频数总和/ 包含该词语的文档数 
假如文档集中有四本小说:A、B、C、D。词语“江湖”在文档A 中出现了100 次,在文档B 
中出现了200 次,在文档C 中出现了300 次,在文档D 中没有出现。则词语“江湖”在该文 
档集中的“平均提及次数”为(100 + 200 + 300) / 3 = 200。

输出格式 
对于每个词语,输出两个键值对,两个键值对的格式如下: 
[词语] \TAB 词语1:词频, 词语2:词频, 词语3:词频, …, 词语100:词频 
[词语] \TAB 平均提及次数 
下图展示了输出文件的一个片段(图中内容仅为格式示例): 
这里写图片描述

设计

倒排索引可以看做是wordcount的拓展,它需要统计一个单词在多个文件中出现的次数,那么它的Mapper和Reducer该如何设计呢? 
很自然地我们会想到 
Mapper:

  • 对于文件file中任一word,
  • Key = word, Value = fileName + 1.

Reduer:

  • 对于输入Key, Iterable(Text) Values,
  • 统计Values中每个Value,记录出现的fileName以及频数.

这里有个问题,它需要假定对于一个相同的Key,Mapper给出的输出

design trick: value-to-key conversion

Value到Key的转换 
比如说对于原来的(term, (docid, tf))可以将value中的docid放到key中从而得到 
新的键值对((term, docid), tf)。 
这样具有相同key值的键值对数目就降低啦!

关于Mapper里边的代码我遇到两个问题: 
1.用空格 ” “来对line做分词,在最后的输出结果里边会出现空白单词,很奇怪,虽然我在输出的时候加了token为” “或”\t”的时候都不输出,但是最后结果里边还是有空白单词,匪夷所思诶。 
2.Mapper的输出如果采用((term:docid), tf)的形式,使用“:”来分隔term和docid,那么在Combiner里边如果我使用”:”来分隔key(也就是下边错误的Mapper方式),那么得到的String个数有时候长度居然<2,所以我现在使用”->”来进行分隔。

public static class InverseIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String line = value.toString();
            String[] strTokens = line.split(" ");
            FileSplit inputSplit = (FileSplit)context.getInputSplit();
            Path path = inputSplit.getPath();
            String pathStr = path.toString();
            int index = pathStr.lastIndexOf("/");
            String strFileName = pathStr.substring(index + 1); 
            for (String token : strTokens){
                if (token != " " && token != "\t"){
                    context.write(new Text(token + "->" + strFileName), new Text("1"));
                }
            }
        }
    }

Combiner的使用

为了减少Mapper的输出,从而降低Mapper到Reducer的传输开销以及存储开销,使用Combiner是个好方法,相当于是在每个Mapper结束之后先进行一次Reducer将结果汇总一下。 
这里是将相同文档的相同term的词频统计一下。 
我看到过有这样一种处理方法,它为了Reducer方便处理,所以将Mapper的输出从((term, docid), tf)变为((term, (docid, tf))。

public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String strKey = key.toString();
            String[] tokens = strKey.split("->");
            int freq = 0;
            for (Text value : values){
                freq++;
            }
            context.write(new Text(tokens[0]), new Text(tokens[1] + "->" + freq));
        }
    }

正确的如下

public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int freq = 0;
            for (Text value : values){
                freq++;
            }
            context.write(key, new Text("" + freq));
        }
    }

Partitioner的设计

因为value-to-key conversion,Mapper的输出中key变为了(term, docid)。如果采用默认的Partitioner,那么具有相同term,不同docid的项很可能会被划分到不同的Reducer,这与初衷是违背的啊,所以需要自定义一个Partitioner,用key中的term作为划分的依据! 
这里有个小问题,如果我采用Combiner中的错误方式,将Combiner的输出重新变化为了(term, (docid, tf)),那么是否还需要自定义Partitioner了呢? 
答案是需要的,看来Partitioner的判断依据不是Combiner的输出啦!

public static class InverseIndexPartitioner extends HashPartitioner<Text, Text>{

        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) {
            // TODO Auto-generated method stub
            String strKey = key.toString();
            String[] tokens = strKey.split("->");
            return super.getPartition(new Text(tokens[0]), value, numReduceTasks);
        }

    }

Reducer的设计

根据Mapper传来的输出( (term, docid), tf),这里需要进行的处理便是将具有相同term的键值对聚集在一起,并重组成( term , (docid1:tf1, docid2:tf2, …) )的输出形式。

  • 采用静态变量strWord来记录上一次reduce过程中的term ;
  • 采用静态变量map记录静态变量strWord对应的docid:tf对;
  • 处理reduce过程时,首先将key分割出term以及fileName,
  • 判断term是否与strWord相等,
  • 如果相等,首先累计额values,得到docid,tf对之后加入map;
  • 否则将strWord,map输出,并清空map,strWord赋值为term,处理当前docid,tf,并加入map;
  • 因为最后一次reduce过程不可能将它自己的数据输出,所以需要重载cleanup函数在里边进行输出
  • 还有一点需要注意,String的相等判断用“==”是不行的哦,如果用了“==”而不是“equals”,会出现什么后果呢?
public static class InverseIndexReducer extends Reducer<Text, Text, Text, Text>{

        static Map<String, Integer> map = new HashMap<String, Integer>();
        static String strWord = null;
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
                String[] tokens = key.toString().split("->");
                if (strWord == null){
                    strWord = tokens[0];
                } 
                if (strWord.equals(tokens[0])){
                    String strFileName = tokens[1];
                    int freq = 0;
                    for (Text value : values){
                        freq += Integer.parseInt(value.toString());
                    }
                    map.put(strFileName, freq);
                } else {
                    String strNewValue = "";
                    double aveFreq = 0;
                    for (Map.Entry<String, Integer> entry : map.entrySet()){
                        strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
                        aveFreq += (double)entry.getValue();
                    }
                    aveFreq /= (double)map.size();
                    Text newKey = new Text(strWord);
                    map.clear();
                    context.write(newKey, new Text(strNewValue));
                    context.write(newKey, new Text("" + aveFreq));

                    strWord = tokens[0];
                    String strFileName = tokens[1];
                    int freq = 0;
                    for (Text value : values){
                        freq += Integer.parseInt(value.toString());
                    }   
                    map.put(strFileName, freq);
                }

        }
        @Override
        protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

            String strNewValue = "";
            double aveFreq = 0;
            for (Map.Entry<String, Integer> entry : map.entrySet()){
                strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
                aveFreq += (double)entry.getValue();
            }
            aveFreq /= (double)map.size();
            Text newKey = new Text(strWord);
            map.clear();
            context.write(newKey, new Text(strNewValue));
            context.write(newKey, new Text("" + aveFreq));

            super.cleanup(context);
        }


    }

main函数

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = new Job(conf, "InverseIndex");
        job.setJarByClass(InverseIndex.class);

        job.setNumReduceTasks(4);

        job.setMapperClass(InverseIndexMapper.class);
        job.setCombinerClass(InverseIndexCombiner.class);
        job.setPartitionerClass(InverseIndexPartitioner.class);
        job.setReducerClass(InverseIndexReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }

运行结果

这里写图片描述 
这里写图片描述

猜你喜欢

转载自blog.csdn.net/wdr2003/article/details/80612018