Wiki网页PageRank算法
系统采用vm下ubuntu16.04
一、 实验内容与要求
在Eclipse环境下编写实现Wiki网页数据集的PageRank算法,列出前30个最高PageRank的网页信息输出列表。
二、 实验数据
在网上找数据的时候没有找到,从师兄那里获取了wiki_0.txt,文件大小约为30MB,该数据包含了维基网页自身的描述信息,如网页标题、ID以及它所指向的其他网页。将其传入hdfs中。
DFS Locations
|--hadoopTest
|--(1)
|--user(1)
|--hk(1)
|--wikipagerank_in(1)
|--wiki_0.txt
三、 设计思想
PageRank的随机浏览模型:假定一个上网者从一个随机的网页开始浏览,上网者不断点击当前网页的链接开始下一次浏览。但是,上网者最终厌倦了,开始了一个随机的网页,随机上网者访问一个新网页的概率就等于这个网页的PageRank值。这个模型更加的接近于用户的行为。
用MapReduce实现PageRank主要通过三个步骤:
(1)GraphBuilder:分析原始数据,建立各个网页之间的关系。
URL:网页URL, RP_init:RP初始值,link_list:网页出度列表
Map:逐行分析原始数据,输出<URL,(RP_init,link_list)>
Reduce:不做任何处理,输出<URL,(RP_init,link_list)>
(2)PageRankIer:迭代计算各个网页的PageRank值,直到PR值收敛或迭代预定次数。
Map:对上阶段的Reduce输出,产生两种<key,value>对:
1、对于出度链表link_list中的每一个网页u,输出键值对<u,cur_rank/|link_list|>
2、在迭代过程中,传递每个网页的链接信息<URL,link_list>(维护图结构)
Reduce:对Map输出的<URL,url_list>和多个<URL,val>做如下处理(<URL,url_list>为当前URL的链出信息,<URL,val>为当前URL的链入网页对其贡献的PageRank值):
计算所有val的和,并乘上d,再加上常数(1-d)/N得到new_rank。输出<URL,(new_rank,link_list)>
(3)Rankviewer:将最终结果排序输出。从最后一次迭代的结果读出文件,并将文件名和其PR值读出,并以PR值为key网页名为value,并且以PR值从大到小的顺序输出。排序过程中可以采用框架自身的排序处理,重载key的比较函数,使其经过shuffle和sort后反序(从大到小)输出。
四、 代码实现
(1)GraphBuilder类
public class GraphBuilder {
public static class GraphBuilderMapper extends
Mapper<LongWritable, Text, Text, Text> {
private static final Pattern wikiLinksPatern = Pattern.compile("\\[.+?\\]");
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String pagerank = "1.0\t";
boolean first = true;
String[] titleAndText = parseTitleAndText(value);
String pageName = titleAndText[0];
Text page = new Text(pageName.replace(',', '_')); // 得到网页的名称
Matcher matcher = wikiLinksPatern.matcher(titleAndText[1]);
while (matcher.find()) {
String otherPage = matcher.group();
otherPage = getWikiPageFromLink(otherPage);
if (otherPage == null || otherPage.isEmpty())
continue;
StringTokenizer itr = new StringTokenizer(otherPage.toString(), "\n");
for (; itr.hasMoreTokens();) {
if (!first)
pagerank += ",";
pagerank += itr.nextToken();
first = false;
}
}
context.write(page, new Text(pagerank));
}
private String[] parseTitleAndText(Text value) throws IOException {
String[] titleAndText = new String[2];
int start = value.find("<title>");
start += 11;
int end = value.find("</title>", start);
if (start == -1 || end == -1)
return new String[] { "", "" };
titleAndText[0] = Text.decode(value.getBytes(), start, end - start); // getBytes()方法得到字符编码方式
start = value.find("<text xml:space");
start += 17;
end = value.find("</text>", start);
if (start == -1 || end == -1)
return new String[] { "", "" };
titleAndText[1] = Text.decode(value.getBytes(), start, end - start);
return titleAndText;
}
private String getWikiPageFromLink(String aLink) {
if (isNotWikiLink(aLink))
return null;
int start = aLink.startsWith("[[") ? 2 : 1;
int endLink = aLink.indexOf("]");
int pipePosition = aLink.indexOf("|");
if (pipePosition > 0) {
endLink = pipePosition;
}
int part = aLink.indexOf("#");
if (part > 0) {
endLink = part;
}
aLink = aLink.substring(start, endLink);
aLink = aLink.replaceAll("\\s", "_");
aLink = aLink.replaceAll(",", "");
if (aLink.contains("&"))
aLink.replaceAll("&", "&");
return aLink;
}
/** 判断是否是wiki百科内部的链接 **/
private boolean isNotWikiLink(String aLink) {
int start = aLink.startsWith("[[") ? 2 : 1;
if (aLink.length() < start + 2 || aLink.length() > 100)
return true;
char firstChar = aLink.charAt(start);
if (firstChar == '#')
return true;
if (firstChar == ',')
return true;
if (firstChar == '.')
return true;
if (firstChar == '&')
return true;
if (firstChar == '\'')
return true;
if (firstChar == '-')
return true;
if (firstChar == '{')
return true;
if (aLink.contains(":"))
return true;
if (aLink.contains(","))
return true;
if (aLink.contains("&"))
return true;
return false;
}
}
public static class GraphBuilderReducer extends
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("mapred.job.tracker", "localhost:9001");
Job job1 = new Job(conf, "Graph Builder");
job1.setJarByClass(GraphBuilder.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job1.setMapperClass(GraphBuilderMapper.class);
job1.setReducerClass(GraphBuilderReducer.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
job1.waitForCompletion(true);
}
}
(2)PageRankIter类
public class PageRankIter {
private static final double damping = 0.85;
public static class PRIterMapper extends
Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tuple = line.split("\t");
String pageKey = tuple[0];
double pr = Double.parseDouble(tuple[1]);
if (tuple.length > 2) {
String[] linkPages = tuple[2].split(",");
for (String linkPage : linkPages) {
String prValue =
pageKey + "\t" + String.valueOf(pr / linkPages.length);
context.write(new Text(linkPage), new Text(prValue));
}
context.write(new Text(pageKey), new Text("|" + tuple[2]));
}
}
}
public static class PRIterReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String links = "";
double pagerank = 0;
for (Text value : values) {
String tmp = value.toString();
if (tmp.startsWith("|")) {
links = "\t" + tmp.substring(tmp.indexOf("|") + 1);
continue;
}
String[] tuple = tmp.split("\t");
if (tuple.length > 1)
pagerank += Double.parseDouble(tuple[1]);
}
pagerank = (double) (1 - damping) + damping * pagerank; // PageRank的计算迭代公式
context.write(new Text(key), new Text(String.valueOf(pagerank) + links));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("mapred.job.tracker", "localhost:9001");
Job job2 = new Job(conf, "PageRankIter");
job2.setJarByClass(PageRankIter.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(PRIterMapper.class);
job2.setReducerClass(PRIterReducer.class);
FileInputFormat.addInputPath(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
job2.waitForCompletion(true);
}
}
(3)PageRankView类
public class PageRankViewer {
public static class PageRankViewerMapper extends
Mapper<LongWritable, Text, FloatWritable, Text> {
private Text outPage = new Text();
private FloatWritable outPr = new FloatWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
String page = line[0];
float pr = Float.parseFloat(line[1]);
outPage.set(page);
outPr.set(pr);
context.write(outPr, outPage);
}
}
//重载key的比较函数,使其经过shuffle和sort后反序(从大到小)输出
public static class DescFloatComparator extends FloatWritable.Comparator {
// @Override
public float compare(WritableComparator a,
WritableComparable<FloatWritable> b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("mapred.job.tracker", "localhost:9001");
Job job3 = new Job(conf, "PageRankViewer");
job3.setJarByClass(PageRankViewer.class);
job3.setOutputKeyClass(FloatWritable.class);
job3.setSortComparatorClass(DescFloatComparator.class);
job3.setOutputValueClass(Text.class);
job3.setMapperClass(PageRankViewerMapper.class);
FileInputFormat.addInputPath(job3, new Path(args[0]));
FileOutputFormat.setOutputPath(job3, new Path(args[1]));
job3.waitForCompletion(true);
}
}
(4)PageRankMain类
public class PageRankMain {
private static int times = 15; // 设置迭代次数
public static void main(String[] args) throws Exception {
String[] myArgs= { "wikipagerank_in", "wikipagerank_out"};
String[] forGB = { myArgs[0], myArgs[1] + "/Data0" };
//分析原始数据,建立各个网页之间的链接关系
GraphBuilder.main(forGB);
String[] forItr = { "", "" };
//迭代15次
for (int i = 0; i < times; i++) {
forItr[0] = myArgs[1] + "/Data" + i;
forItr[1] = myArgs[1] + "/Data" + String.valueOf(i + 1);
PageRankIter.main(forItr);
}
String[] forRV = { myArgs[1] + "/Data" + times, myArgs[1] + "/FinalRank" };
//输出结果,反向排序
PageRankViewer.main(forRV); }
}
五、 实验结果
输出结果和代码里面设置的迭代次数有关