这次的项目比较前面的几个会相对复杂点
项目介绍:这是一个统计页面权重值的MapReduce
PageRank通过网络浩瀚的超链接关系来确定一个页面的等级。Google把从A页面到B页面的链接解释为A页面给B页面投票,Google根据投票来源(甚至来源的来源,即链接到A页面的页面)和投票目标的等级来决定新的等级。简单的说,一个高等级的页面可以使其他低等级页面的等级提升。
用一个例子解释一下
上图中,A、B、C、D各为网站的页面
其中A网站中有B、D的网页地址超链接,用户可以通过A网页跳转到B、D网页,就是所谓的A网页给B、D投票了。
那么A、B、C这三个网站就存在这一定的PageRank值,PageRank值在搜索引擎中的表现就是,PageRank值越高,在搜索引擎中的查询排名就越靠前。
首先,我们通过手工计算一个这4个网页的PageRank
我们假设A、B、C、D的初始值都为1,假设这个1,是他们手上存在的权重。
A得到了C的投票,没有得到其他网页的投票,那我们将重新计算A的PageRank(PR)。
A的PR怎么来呢?我们继续看C,C手上有权重为1的票,这时他把票同时投给了A和B,
那么A得到了C投来的1/2的票,那么,A的权重值变为1/2。
以此类推,B得到了A、C、D三个网站的投票,我们看到,A共投出了2票,C投出了2票、D投出了2票,那么B最终的PR值变为3/2。
…
第一轮投票过后,我们得出各页面的权重值如下:
A: 1/2
B: 3/2
C: 3/2
D: 1/2
他们的权重和为4,和未投票之前相同。
接下来,以第一轮投票的PR值为基础,再次进行第二轮投票,计算方式和第一次一样,只不过是各页面拥有的权重值变了,ok,我们再算一次。
B得到了A、C、D的票,A共投出了2票,A的权重为1/2,那么B从A得到了1/4的权重,C共投出了2票,C的权重为3/2,那么B从C得到了3/4的权重,D共投出了2票,D的权重为1/2,那么B从D得到了1/4的权重,计算得到,B的PR值从3/2变为了5/4。比较第一次的PR差缩小了一倍。
也就是说,我们只要用上述方式进行不断地计算,4个网站的权重值最终为收敛到一个稳定的值,这就是PageRank的算法,以下是公式。
其中N为网页的个数,我们以当前的数据来说可以轻易的看出是4,生产环境中,我们可以跑一次MapReduce进行一次COUNT得到。
q为阈值,因为在实际的网站中,存在那种自己只得到投票,而不投别人的网站,例如博客,于是谷歌计算出了一个值,来避免这些网站的PR值无线增高,而这个q的值为0.85。
L表示的是你给其他网站的投票数量。
那么,我们通过公示来归纳一下我们之前计算的PR值
PR(B) = PR(A)/L(A) + PR(B)/L(B) + PR(C)/L(C)
这就是我们PageRank的原理。
数据:
A B D
B C
C A B
D B C
解释:
以上数据就是我们上图所展示的数据关系,第一行的的A就表示页面A,他把票投给了B和D。
思路:
我们可以自定义数据集,来介绍A的节点与B、D的关系,首先我们先设计一个字符串的模型来解释第一行的数据。
A B D
=>
A:1.0 B D
其中,1.0表示的是A的初始权重,这个权重会随着我们不断重复的MapReduce收敛与一个相对固定的值。
我们在Map中将文本处理成这种格式之后,我们还要解释B、D各自的信息,于是我们还需要创建一下的结构
B:0.5
D:0.5
用来表示B和D在第一行的数据集中,各自得到了来自A的0.5票权重值。
接着,我们将处理完的数据递交reducer,假设产生以下数据:
A:0.5
A:1.0 B D
在reducer中,我们将赋予A全新的权重,并得到一个全新的A:
A:0.5 B D
用于第二次的MapReduce,最终,我们设置一个阈值,假设为0.001。
我们通过不断的比较这一次和上一次的PR差值,直到PR差值小于这个值的时候,我们停止MapReduce,最后一次Job的数据,就是我们想要的最终数据。
关键点在于如何设计自定义bean,以下是我自己思考后创建的bean,大家仅做参考:
public class Node implements WritableComparable {
private String separator = "\t";
private String separator2 = ":";
private String strData;
private String key;
private List<String> voteNode;
private Double weight;
public Node(String data){
strData = data;
}
public boolean unserialize() {
String[] str = StringUtils.split(strData, separator);
List<String> list= Arrays.asList(str);
if (list.size() == 0) {
return false;
}
String tmp = list.get(0);
String[] strs = tmp.split(separator2);
key = strs[0];
weight = Double.parseDouble(strs[1]);
if (list.size() == 1) {
voteNode = null;
} else {
voteNode = list.subList(1, list.size());
}
return true;
}
public String getStrData() {
return strData;
}
public void setStrData(String strData) {
this.strData = strData;
}
public String getKey() {
return key;
}
/**
* 判断这个数据是链数据,还是单节点的权重数据
* 链数据 A:1.0 B C
* 权重数据 A:0.5
* @return
*/
public boolean isChain() {
return voteNode != null;
}
public void setKey(String key) {
this.key = key;
}
public List<String> getVoteNode() {
return voteNode;
}
public void setVoteNode(List<String> voteNode) {
this.voteNode = voteNode;
}
public Double getWeight() {
return weight;
}
public void setWeight(Double weight) {
this.weight = weight;
}
@Override
public int compareTo(Object o) {
Node node = (Node) o;
int res = key.compareTo(node.getKey());
if (res == 0) {
return isChain() ? (node.isChain() ? 0 : 1) : -1;
}
return res;
}
public String toString() {
if (isChain()) {
return key + separator2 + weight + separator
+ StringUtils.join(voteNode.toArray(), separator);
}
return key + separator2 + weight;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
}
@Override
public void readFields(DataInput dataInput) throws IOException {
}
}
完整代码地址
https://github.com/qn9301/bigdata-learn/tree/master/hadoop/src/com/hadoop/learn/pagerank