版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/macanv/article/details/78100023
PageRank 的spark版本
本篇记录使用原始的RDD自己手动写一个pageRank算法,主要是联系map, sort, groupbyKey,mapValues,join的操作
算法细节可以参考:https://wizardforcel.gitbooks.io/dm-algo-top10/content/apriori.html
Java实现
package RDD;
import com.google.common.collect.Iterables;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Created by Macan on 2017/9/26.
*/
public class PageRank implements Serializable {
public static void main(String[] args) {
SparkConf sc = new SparkConf().setMaster("local[*]").setAppName("PageRank");
JavaSparkContext jsc = new JavaSparkContext(sc);
if (args.length < 1){
System.out.println("should assigned parameter of page info");
System.exit(-1);
}
JavaRDD<String> data = jsc.textFile(args[0]);
//加载所有的网页关系数据,并且将其转化为pair类型,
JavaPairRDD<String, Iterable<String>> links = data.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
String[] urls = s.split(" ");
Tuple2<String, String> res = new Tuple2<>(urls[0], urls[1]);
return res;
}
}).distinct().groupByKey().cache();
//初始化rank 设置每一个页面的初始权重为1.0,使用mapValue生成RDD
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
@Override
public Double call(Iterable<String> strings) throws Exception {
return 1.0;
}
});
//迭代计算更新每个页面的rank,迭代次数可以自由设定,最好是设置结束条件:收敛结束
for (int i = 0; i < 100; ++i) {
JavaPairRDD<String, Double> contribs = links.join(ranks).values().flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>,Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> iterableDoubleTuple2) throws Exception {
int urlCount = Iterables.size(iterableDoubleTuple2._1());
List<Tuple2<String, Double>> results = new ArrayList<>();
for (String str : iterableDoubleTuple2._1) {
results.add(new Tuple2<String, Double>(str, iterableDoubleTuple2._2 / urlCount));
}
return results;
}
});
//重新计算URL 的 ranks值 基于临接网页
ranks = contribs.reduceByKey(new Function2<Double, Double, Double>() {
@Override
public Double call(Double aDouble, Double aDouble2) throws Exception {
return aDouble + aDouble2;
}
}).mapValues(new Function<Double, Double>() {
@Override
public Double call(Double aDouble) throws Exception {
return 0.15 + aDouble * 0.85;
}
});
}
//输出所有的页面的pageRank 值
List<Tuple2<String, Double>> out = ranks.collect();
for (Tuple2<String, Double> tuple : out) {
System.out.println(tuple._1 + "\t " + tuple._2);
}
}
}