本章待完善。
有问题待解答。
broadcast()
共享全局变量,实验证明,无论是本地还是集群模式,全局变量也是可以的,为什么还要广播呢?
broadcast() |
---|
本测试提交脚本
cd $SPARK_HOME
#top10
./bin/spark-submit --class cn.whbing.spark.dataalgorithms.chap02.TopN --master spark://master-1a:7077 /home/whbing/algorithms/dataalgorithms-0.0.1-SNAPSHOT.jar hdfs:///whbing/algorithms/chap02-top10.txt
package cn.whbing.spark.dataalgorithms.chap02;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import com.google.common.collect.Lists;
import cn.whbing.spark.dataalgorithms.chap01.util.TupleComparator;
import scala.Tuple2;
import scala.collection.generic.BitOperations.Int;
/**
*
* @author whbing
*
* 在local模式下,全局变量不用广播可以识别。
* 测试在集群模式下是否可以识别。结果表明也是可以识别。
*
* 为何还要用广播变量呢?
*
* 本测试提交脚本
* cd $SPARK_HOME
#top10
./bin/spark-submit --class cn.whbing.spark.dataalgorithms.chap02.TopN --master spark://mast
er-1a:7077 /home/whbing/algorithms/dataalgorithms-0.0.1-SNAPSHOT.jar hdfs:///whbing/algorit
hms/chap02-top10.txt
*
*/
public class TopN {
//所谓的全局变量,在分布式编程中直接使用会失效?实验证明并没有
final public static int topN = 6;
public static void main(String[] args) {
//读取参数并验证
if(args.length<1){
System.err.println("Usage: SecondarySort <file>");
System.exit(1); //1表示非正常退出,0表示正常退出
}
//读取输入的参数,即输入文件
String inputPath = args[0];
System.out.println("args[0]:<file>="+args[0]);
SparkConf conf = new SparkConf().setAppName("SecondarySort by spark");
JavaSparkContext sc = new JavaSparkContext(conf);
//读取每一行即<cat_id><,><weight>
//JavaRDD<String> lines = sc.textFile("chap02-top10.txt",2);
JavaRDD<String> lines = sc.textFile(inputPath,2);
//转化成键值对形式--> <catId,weight>
JavaPairRDD<String, Integer> pairs =
lines.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
//对输入s进行转换
String[] tokens = s.split(",");//s即cat1,2 以逗号分割
String catId = tokens[0];
Integer weight = new Integer(tokens[1]);
return new Tuple2<String,Integer>(catId,weight);
}
});
/*
* 思路:将RDD以weight为key放入TreeMap中(默认升序),只留后10个即可
*/
// create a local top-10
JavaRDD<TreeMap<Integer, String>> partitions = pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, TreeMap<Integer, String>>() {
@Override
public Iterator<TreeMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
TreeMap<Integer, String> top10 = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
top10.put(tuple._2, tuple._1);
// keep only top N
if (top10.size() > topN) {
top10.remove(top10.firstKey());
}
}
//以下两种均可,map转iterator
//return Collections.singletonList(top10).iterator();
return Lists.newArrayList(top10).iterator();
}
});
//汇总reduce到集合
//用于保存最终top10
TreeMap<Integer,String> finalTop10_2 = new TreeMap<Integer,String>();
partitions.reduce(new Function2<
TreeMap<Integer,String>,
TreeMap<Integer,String>,
TreeMap<Integer,String>>() {
@Override
public TreeMap<Integer, String> call(TreeMap<Integer, String> m1, TreeMap<Integer, String> m2) throws Exception {
//m1,partitionK-1
for(Map.Entry<Integer, String> entry:m1.entrySet()){
finalTop10_2.put(entry.getKey(), entry.getValue());
if(finalTop10_2.size()>topN){
finalTop10_2.remove(finalTop10_2.firstKey());
}
}
//m2,partitionK
for(Map.Entry<Integer, String> entry:m2.entrySet()){
finalTop10_2.put(entry.getKey(), entry.getValue());
if(finalTop10_2.size()>topN){
finalTop10_2.remove(finalTop10_2.firstKey());
}
}
return finalTop10_2;
}
});
//方案2验证finalTop10,打印finalTop10_2集合即可
System.out.println("====汇总方案2,reduce到集合方法,finalTop10_2结果:====");
for(Integer key:finalTop10_2.keySet()){
System.out.println(key+" -> "+finalTop10_2.get(key));
}
}
}
实验证明运行正常。