基本设计
设有一个英文语句:
- 设共现窗口定义为连续出现的两个单词,如下图给出上句英文的共现矩阵。
单词共现算法实现(伪代码)
Map端伪代码如下:
class Mapper
method Map(dociddid, doc d)
for all word w 属于 d
for all word u 属于 Window(w)
//发射出现计数 1
Emit(pair (w, u), 1)
Reduce 端伪代码如下:
class Reducer
method Reduce(pair p; countlist [c1, c2,..])
s = 0
for all count c in countlist [c1, c2, ...]
s = s+ c
Emit(pair p, count s)
- 上述Mapper伪代码中使用了一个window定义,表示如果单词w的窗口u 属于w的窗口内,则认为是(u,w)的一次出现。这里的窗口Windows可以根据不同的应用需求有不同的定义,比例,可以定义为一个固定大小的窗口,或者是前者相连出现、在同义句中、在同一段落中出现的单词等。
- 例如,如果窗口中的单词为[w1, w2, w3],我们发射((w1,w2),1)和((w1,w3),1)出去然后窗口向后移动一个单词。REduce阶段则对发来的相同键的值进行简单的求和即可。这里单词顺序有无关系需要看具体的情况而定。另外,在实际实现中我们需要传入Map的数据时以一个文本为单位的,这里需要实现一个 以便一个文本呗拆分被整个传入到一个Map节点。
例如,
在Map阶段,一个Map节点接受到如图所示的一个文档的内容,
窗口大小为7
,那么首先窗口先覆盖了 ,然后,该结点将键值对 , , , , 发射出去。随后窗口向后滑动一格,与上面相似,这时将 , , , , , ,发射出去。最后再向后滑动一一个单词至文档的末尾,与上面相似,发送相应的键值对出去。当窗口尾部已经到达文档尾部时,滑动窗口则通过将窗口头部向后“缩进”来进行,此过程一直进行到窗口大小为2停止。
单词共现算法实现中的细节问题
- 发射出去的单词对需要自定义以个 类,该类需要实现 接口。需要重写自定义类WordPair的hashCode()方法,使得相同的 主键都被发送到相同的 节点去。另外,我们还需要重写 和equals()方法使得相同的 键的值可以比较大小和排序。
- WordPair主要代码:
public class WordPair implements WritableComparable<WordPair>{
private String wordA;
private String wordB;
public WordPair(){
}
public WordPair(String wordA,String wordB){
this.wordA = wordA;
this.wordB = wordB;
}
public String getWordA(){
return this.wordA;
}
public String getWordB(){
return this.wordB;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(wordA);
out.writeUTF(wordB);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
wordA = in.readUTF();
wordB = in.readUTF();
}
@Override
public String toString(){
return wordA + "," + wordB;
}
@Override
public int compareTo(WordPair o) {
if(this.equals(o))
return 0;
else
return (wordA + wordB).compareTo(o.getWordA() + o.getWordB());
}
@Override
public boolean equals(Object o){
//无序对,不用考虑顺序
if(!(o instanceof WordPair))
return false;
WordPair w = (WordPair)o;
if((this.wordA.equals(w.wordA) && this.wordB.equals(w.wordB))
|| (this.wordB.equals(w.wordA) && this.wordA.equals(w.wordB)))
return true;
return false;
}
@Override
public int hashCode(){
return (wordA.hashCode() + wordB.hashCode()) * 17;
}
}
- Mapper主要代码:
private int windowSize;
private Queue<String> windowQueue = new LinkedList<String>();
@Override
protected void setup(Context context) throws IOException,InterruptedException{
windowSize = Math.min(context.getConfiguration().getInt("window", 2) , MAX_WINDOW);
}
/**
* 输入键位文档的文件名,值为文档中的内容的字节形式。
*
*/
@Override
public void map(Text docName, BytesWritable docContent, Context context)throws
IOException, InterruptedException{
Matcher matcher = wordPattern.matcher(new String(docContent.getBytes(),"UTF-8"));
while(matcher.find()){
windowQueue.add(matcher.group());
if(windowQueue.size() >= windowSize){
//对于队列中的元素[q1,q2,q3...qn]发射[(q1,q2),1],[(q1,q3),1],
//...[(q1,qn),1]出去
Iterator<String> it = windowQueue.iterator();
String w1 = it.next();
while(it.hasNext()){
String next = it.next();
context.write(new WordPair(w1, next), one);
}
windowQueue.remove();
}
}
if(!(windowQueue.size() <= 1)){
Iterator<String> it = windowQueue.iterator();
String w1 = it.next();
while(it.hasNext()){
context.write(new WordPair(w1,it.next()), one);
}
}
}