1、countByKey
统计 RDD 中各元素的数量
Scala版本
val conf = new SparkConf().setMaster("local[*]").setAppName("CountByKeyScala")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(("a",2),("a",3),("c",5),("c",7),("b",9)))
println(rdd.countByKey())
//Map(a -> 2, b -> 1, c -> 2)
Java版本
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("CountByKeyJava");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<String, Integer>> tupRdd = sc.parallelize(Arrays.asList(new Tuple2<String, Integer>("a", 2),
new Tuple2<String, Integer>("c", 5),
new Tuple2<String, Integer>("a", 5),
new Tuple2<String, Integer>("c", 9),
new Tuple2<String,Integer>("b",8)));
//转换为 mapRDD
JavaPairRDD<String, Integer> mapRdd = JavaPairRDD.fromJavaRDD(tupRdd);
Map<String, Long> countMapRdd = mapRdd.countByKey();
for (String i:countMapRdd.keySet()){
System.out.println(i+":"+countMapRdd.get(i));
}
/*
a:2
b:1
c:2
*/
2、collectAsMap
输出PairRDD
键值对,RDD 中key
相同的只保留最后一个
Scala版本
val conf = new SparkConf().setAppName("CollectAsMapScala").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array((1,2),(1,4),(1,3),(2,3),(3,5)),1)
println(rdd.collectAsMap())
//Map(2 -> 3, 1 -> 3, 3 -> 5)
Java版本
SparkConf conf = new SparkConf().setAppName("CollectAsMapJava").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<Integer, Integer>> tupleRDD =
sc.parallelize(Arrays.asList(new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(2, 4),
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 4),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(3, 6)));
JavaPairRDD<Integer, Integer> mapRDD = tupleRDD.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
return integerIntegerTuple2;
}
});
System.out.println(mapRDD.collectAsMap());
//{2=5, 1=2, 3=6}