若返回要求为
JavaPairRDD<K,V>
,Tuple2<K,V>
即可
1 说明
本实验测试spark
实现top10
。
思路:先对数据进行mapPartitions
对每个区取top10
得到partition
,然后对上述partition
(即每个区的top10
)取得的结果进行top10
汇总。
当数据存放在一个分区时,本地partirion
和最终汇总效果一直;
当有多分区时,每个分区都会取各自的top10
。
本章使用的mapPartitions()
见上一篇博客。
本章知识
方法 | 描述 |
---|---|
foreach() |
对RDD使用,每个RDD 都执行对分区使用,则每个分区都执行 |
collect() |
对n 分区使用,则生成长度为n 的List 每个元素为整个分区 如: List<TreeMap<Integer, String>> allTop10 = partitions.collect(); |
reduce |
static T reduce(Function2<T,T,T> f) 参数中的前两个T都是输入(满足交换律结合律),第三个T是输出 |
1 唯一key单分区和多分区测试
输入数据
21行,测试单分区top10
和2分区top10
chap02-top10.txt
cat1,12
cat2,13
cat3,14
cat4,15
cat5,10
cat100,100
cat200,200
cat300,300
cat1001,1001
cat67,67
cat22,22
cat23,23
cat1000,1000
cat2000,2000
cat400,400
cat500,500
c1,1
c2,2
c3,3
c4,4
c5,555
核心流程
/ 1 / 说明
上述标号中的
① 核心代码:
pairs.mapPartitions(
new FlatMapFunction<
Iterator<Tuple2<String,Integer>>,//对分区操作,输入为可迭代
TreeMap<Integer, String> //输出
>() {
...
}
② 对1分区和2分区测试
1分区:
JavaRDD<String> lines = sc.textFile("chap02-top10.txt");
2分区:
JavaRDD<String> lines = sc.textFile("chap02-top10.txt",2);
/ 1 / 代码
package cn.whbing.spark.dataalgorithms.chap02;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import com.google.common.collect.Lists;
import cn.whbing.spark.dataalgorithms.chap01.util.TupleComparator;
import scala.Tuple2;
/**
*
* @author whbing
*/
public class Top10 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SecondarySort by spark").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//读取每一行即<cat_id><,><weight>
JavaRDD<String> lines = sc.textFile("chap02-top10.txt",2); //***
//转化成键值对形式--> <catId,weight>
JavaPairRDD<String, Integer> pairs =
lines.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
//对输入s进行转换
String[] tokens = s.split(",");//s即cat1,2 以逗号分割
String catId = tokens[0];
Integer weight = new Integer(tokens[1]);
return new Tuple2<String,Integer>(catId,weight);
}
});
/*
* 思路:将RDD以weight为key放入TreeMap中(默认升序),只留后10个即可
*/
// STEP-5: create a local top-10
JavaRDD<TreeMap<Integer, String>> partitions = pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, TreeMap<Integer, String>>() {
@Override
public Iterator<TreeMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
TreeMap<Integer, String> top10 = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
top10.put(tuple._2, tuple._1);
// keep only top N
if (top10.size() > 10) {
top10.remove(top10.firstKey());
}
}
//以下两种均可,map转iterator
//return Collections.singletonList(top10).iterator();
return Lists.newArrayList(top10).iterator();
}
});
//验证本地top10
//当有多个分区时(本测试为2,见input后参数),每个partition都会执行以下操作
System.out.println("验证本地top10 partitions:");
partitions.foreach(new VoidFunction<TreeMap<Integer,String>>() {
@Override
public void call(TreeMap<Integer, String> m) throws Exception {
for(Map.Entry<Integer, String> entry: m.entrySet()){
System.out.println(entry.getKey()+":"+entry.getValue());
}
}
});
}
}
/ 2 / 结果:
1分区时结果:
INFO SparkContext: Starting job: foreach at Top10.java:82
验证本地top10 partitions:
67:cat67
100:cat100
200:cat200
300:cat300
400:cat400
500:cat500
555:c5
1000:cat1000
1001:cat1001
2000:cat2000
结果表明:1分区时就是最终结果
2分区时结果:
INFO SparkContext: Created broadcast 0 from textFile at Top10.java:39
验证本地top10 partitions:
10:cat5
12:cat1
13:cat2
14:cat3
15:cat4
67:cat67
100:cat100
200:cat200
300:cat300
1001:cat1001
2:c2
3:c3
4:c4
22:cat22
23:cat23
400:cat400
500:cat500
555:c5
1000:cat1000
2000:cat2000
结果表明:每个分区都有对自己区top10
的结果。
2 分区汇总 :最终top10
迭代处理每个分区的top10
结果,创建最终的top10
2.1 汇总方案1:collect到集合
//汇总方案1:collect到集合
/**
* partition1 partition2
* | |
* collect()
* |
* List<partition1或2的类型>
* 说明:partition1或2类型一致,List的长度为分区个数,2分区即为2
*/
//用于保存最终top10
TreeMap<Integer,String> finalTop10 = new TreeMap<Integer,String>();
List<TreeMap<Integer, String>> allTop10 = partitions.collect();
for(TreeMap<Integer,String> treeMap : allTop10){ //List
//每次循环的treeMap为每个分区的treeMap
//对每个分区的treeMap遍历
for(Map.Entry<Integer, String> entry : treeMap.entrySet()){
finalTop10.put(entry.getKey(), entry.getValue());
//只保留前10
if(finalTop10.size()>10){
finalTop10.remove(finalTop10.firstKey());//默认从小到大排序
}
}
}
//验证finalTop10,打印finalTop10集合即可
System.out.println("====汇总方案1,集合方法,finalTop10结果:====");
for(Integer key:finalTop10.keySet()){
System.out.println(key+" -> "+finalTop10.get(key));
}
结果:
====汇总方案1,collect到集合方法,finalTop10结果:====
67 -> cat67
100 -> cat100
200 -> cat200
300 -> cat300
400 -> cat400
500 -> cat500
555 -> c5
1000 -> cat1000
1001 -> cat1001
2000 -> cat2000
2.1 汇总方案2:reduce到集合
//汇总方案2:reduce到集合
//用于保存最终top10
TreeMap<Integer,String> finalTop10_2 = new TreeMap<Integer,String>();
partitions.reduce(new Function2<
TreeMap<Integer,String>,
TreeMap<Integer,String>,
TreeMap<Integer,String>>() {
@Override
public TreeMap<Integer, String> call(TreeMap<Integer, String> m1, TreeMap<Integer, String> m2) throws Exception {
//m1,partitionK-1
for(Map.Entry<Integer, String> entry:m1.entrySet()){
finalTop10_2.put(entry.getKey(), entry.getValue());
if(finalTop10_2.size()>10){
finalTop10_2.remove(finalTop10_2.firstKey());
}
}
//m2,partitionK
for(Map.Entry<Integer, String> entry:m2.entrySet()){
finalTop10_2.put(entry.getKey(), entry.getValue());
if(finalTop10_2.size()>10){
finalTop10_2.remove(finalTop10_2.firstKey());
}
}
return finalTop10_2;
}
});
//方案2验证finalTop10,打印finalTop10_2集合即可
System.out.println("====汇总方案2,reduce到集合方法,finalTop10_2结果:====");
for(Integer key:finalTop10_2.keySet()){
System.out.println(key+" -> "+finalTop10_2.get(key));
}
结果
====汇总方案2,reduce到集合方法,finalTop10_2结果:====
67 -> cat67
100 -> cat100
200 -> cat200
300 -> cat300
400 -> cat400
500 -> cat500
555 -> c5
1000 -> cat1000
1001 -> cat1001
2000 -> cat2000