一、基础转换操作
- map 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,原RDD中的元素在新RDD中都有且只有一个元素与之对应。
public static void mapTest(JavaSparkContext sc) {
List<String> words = Arrays.asList("hello", "world");
JavaRDD<String> wordsRDD = sc.parallelize(words);
// map操作为每个单词赋值
JavaRDD<Tuple2<String, Integer>> wordCountRDD = wordsRDD
.map(new Function<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 4883828149185152684L;
public Tuple2<String, Integer> call(String v1) throws Exception {
return new Tuple2<String, Integer>(v1, 1);
}
});
// 遍历每个单词,并打印其出现的次数
wordCountRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 4892545561975184834L;
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println("单词:" + t._1 + ",次数:" + t._2);
}
});
}
- distinct 去除RDD重复元素,返回所有元素不重复的RDD。
public static void distinctTest(JavaSparkContext sc) {
List<Integer> nums = Arrays.asList(1, 2, 2, 3, 5);
JavaRDD<Integer> numsRDD = sc.parallelize(nums);
// distinct操作去重
JavaRDD<Integer> distinceNumsRDD = numsRDD.distinct();
distinceNumsRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 647204360041943265L;
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
- flatMap 首先进行flat扁平化然后在进行map操作。
public static void flatMapTest(JavaSparkContext sc) {
List<String> words = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD<String> wordsRDD = sc.parallelize(words, 3);
JavaRDD<String> splitedRDD = wordsRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 840597214907231645L;
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
splitedRDD.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 4032309929532415386L;
public void call(String t) throws Exception {
System.out.println("单词:"+t);
}
});
}
- coalesce 对RDD根据指定的分区数进行重分区,第二参数指定是否进行shuffle;注意:如果指定分区数小于原分区,则可以顺利进行,但是如果大于原分区数,必须指定shuffle参数为true,否则分区不会改变。
- repartition也是对根据指定的分区数进行重分区,但是第二个参数默认就是true,也就是默认就需要shuffle操作
public static void repartitionTest(JavaSparkContext sc) {
List<String> words = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD<String> wordsRDD = sc.parallelize(words, 3);
System.out.println("重新分区前的分区数:" + wordsRDD.partitions().size());
JavaRDD<String> coalesce1RDD = wordsRDD.coalesce(2);
System.out.println("将分区数设置为2后的分区数:"+coalesce1RDD.partitions().size());
JavaRDD<String> coalesce2RDD = wordsRDD.coalesce(4);
System.out.println("将分区数设置为4且不指定shuffle后的分区数:"+coalesce2RDD.partitions().size());
JavaRDD<String> coalesce3RDD = wordsRDD.coalesce(4,true);
System.out.println("将分区数设置为4且指定shuffle后的分区数:"+coalesce3RDD.partitions().size());
JavaRDD<String> repartitionRDD = wordsRDD.repartition(4);
System.out.println("repartition后的分区数:" + repartitionRDD.partitions().size());
}
- randomSplit(weights:Array[Double], seed:Long=Utils.random.nextLong):Array[RDD[T]] 根据weights权重将一个RDD分割为多个RDD。
public static void randomSplit(JavaSparkContext sc) {
List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 10);
// 权重的和为1
double[] weights = new double[] {0.1,0.2,0.3,0.4};
JavaRDD<Integer>[] RDDs = numsRDD.randomSplit(weights);
int index = 1;
for (JavaRDD<Integer> rdd : RDDs) {
System.out.println("第"+(index++)+"个RDD");
rdd.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 510345733961440792L;
public void call(Integer t) throws Exception {
System.out.print(t+" ");
}
});
}
}
- glom():RDD[Array[T]] 将RDD中每一个分区所有类型为T的数据转变成元素类型为T的数组Array[T]
public static void glomTest(JavaSparkContext sc) {
List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 3);
JavaRDD<List<Integer>> glomRDD = numsRDD.glom();
glomRDD.foreach(new VoidFunction<List<Integer>>() {
private static final long serialVersionUID = -3717022904720402200L;
public void call(List<Integer> t) throws Exception {
System.out.println(t.toString());
}
});
}
- union(other:RDD[T]):RDD[T] RDD进行合并,返回两个RDD的并集,返回元素不去重
- intersection(other:RDD[T]):RDD[T] 返回两个RDD的交集,类似于SQL的inner join,返回元素去重
- intersection(other:RDD[T],numPartitions:Int):RDD[T]
- intersection(other:RDD[T],partitioner:Partitioner):RDD[T]
- subtract(other:RDD[T]):RDD[T] 返回在RDD中出现但是不在other RDD中出现的元素,返回元素不去重
- subtract(other:RDD[T],numPartitions:Int):RDD[T]
- subtract(other:RDD[T],partitioner:Partitioner):RDD[T]
public static void unionTest(JavaSparkContext sc) {
List<Integer> num1s = Arrays.asList(1,2,3,4,5);
List<Integer> num2s = Arrays.asList(2,3,4,6,7);
JavaRDD<Integer> num1sRDD = sc.parallelize(num1s);
JavaRDD<Integer> num2sRDD = sc.parallelize(num2s);
JavaRDD<Integer> unionRDD = num1sRDD.union(num2sRDD);
System.out.println("union(并集不去重)操作返回:"+unionRDD.collect().toArray());
JavaRDD<Integer> intersectionRDD = num1sRDD.intersection(num2sRDD);
System.out.println("intersection(交集去重)操作返回:"+intersectionRDD.collect().toArray());
JavaRDD<Integer> subtractRDD = num1sRDD.subtract(num2sRDD);
System.out.println("subtract(差集不去重)操作返回:"+subtractRDD.collect().toArray());
}
- mapPartitions[U](f: (Iterator[T]) =>Iterator[U],preserversPartitions:BooLean = false):RDD[U]
和map操作类似,只不过映射的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,其中preserversPartitions表示是否保留父RDD的的partitions分区信息。如果在映射过程中需要频繁创建的额外对象,使用mapPartitions比操作map高效的多,比如RDD所有数据通过JDBC写入数据库,如果使用map函数可能为每一个元素都创建连接,而使用mapPartitions那么只需要对每一个分区建立一个连接。但是mapPartitions不适用于大对象,因为一下子加载到内存中容易发生内存溢出。
public static void mapPartitionsTest(JavaSparkContext sc) {
List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 3);
// 计算每个分区的合计
JavaRDD<Integer> totalRDD = numsRDD
.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
private static final long serialVersionUID = 8064196197615893270L;
public Iterable<Integer> call(Iterator<Integer> t) throws Exception {
Integer total = 0;
while (t.hasNext()) {
total = total + t.next();
}
return Arrays.asList(total);
}
});
System.out.println(totalRDD.collect().toArray());
}
- mapPartitionsWithIndex[U](f: (Int,Iterator[T] )=> Iterator[U],preserversPartitions:BooLean = false)):RDD[U]
类似于mapPartitions,只是输入参数多了一个分区索引
public static void mapPartitionsWithIndexTest(JavaSparkContext sc) {
List<String> nums = Arrays.asList("1", "2", "3", "4", "5");
JavaRDD<String> numsRDD = sc.parallelize(nums, 2);
JavaRDD<String> totalRDD = numsRDD
.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
private static final long serialVersionUID = 2818512306761968511L;
public Iterator<String> call(Integer index, Iterator<String> nums) throws Exception {
StringBuilder builder = new StringBuilder();
while (nums.hasNext()) {
builder.append(nums.next() + "、");
}
return Arrays.asList("分区索引下标:" + index + ",其值为:" + builder.toString()).iterator();
}
}, false);
System.out.println(totalRDD.collect());
}
- zip U:RDD[(T,U)] 用于将两个RDD组合成Key/Value的形式的RDD,默认两个RDD的分区数和元素数量相同,否则抛出异常,这也是所谓的拉链操作
val rdd1 = sc.makeRDD(1 to 5,2)
val rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd1.zip(rdd2).collect
// 返回结果
res14: Array[(Int, String)] = Array((1,A), (2,B),(3,C), (4,D), (5,E))
val rdd3 = sc.makeRDD(Seq("A","B","C"),3)
rdd1.zip(rdd3).collect
// 因为两个RDD分区数不同,抛出异常。(Can't zip RDDs with unequal numbers of partitions:List(2,3))
- zipPartitions:将多个RDD按照partition组合成为新的RDD,该操作需要RDD分区数相同,但是对于每一个分区内元素数量没有限制。
- zipWithIndex():RDD[(T,Long)] 将RDD中的元素和这个元素在RDD的id索引号组合成键值对
public static void zipWithIndexTest(JavaSparkContext sc) {
List<String> strs = Arrays.asList("a", "b", "c", "d", "e");
JavaRDD<String> strsRDD = sc.parallelize(strs, 2);
JavaPairRDD<String, Long> pairRDD = strsRDD.zipWithIndex();
System.out.println(pairRDD.collect());
}
- zipWithUniqueId():RDD[(T,Long)] 将RDD的元素和一个唯一的ID组合成键值对。
该唯一的ID生成算法如下:(1)每个分区中第一个元素的唯一ID值为:该分区索引号;(2)每个分区中第N个元素的唯一ID值为:前一个元素的唯一ID值+该RDD总的分区数。
其中zipWithIndex需要启动一个Spark作业来计算每个分区的开始索引号,而zipWithUniqueId则不需要。
public static void zipWithUniqueIdTest(JavaSparkContext sc) {
List<String> strs = Arrays.asList("a", "b", "c", "d", "e");
JavaRDD<String> strsRDD = sc.parallelize(strs, 2);
JavaPairRDD<String, Long> pairRDD = strsRDD.zipWithUniqueId();
System.out.println(pairRDD.collect());
}
二、键值转换操作
- partitionBy(p:Partitioner):RDD[(K,V)] 根据Partition函数生成新的ShuffleRDD,将原RDD重新分区
- mapValues[U]:(f:(V)=>U):RDD[(K,V)] 类似于map只不过是针对[K,V]中的value值进行map操作
- flatMapValues[U]:(f:(V)=>TraversableOnce[U]):RDD[(K,V)] 类似于flatMap操作,只不过是针对[K,V]中value值进行flatMap操作
public static void mapValuesTest(JavaSparkContext sc) {
List<String> strs = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD<String> strsRDD = sc.parallelize(strs);
JavaRDD<String> splitedRDD = strsRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = -984130321206766818L;
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
JavaPairRDD<Integer, String> pairRDD = splitedRDD
.mapToPair(new PairFunction<String, Integer, String>() {
private static final long serialVersionUID = 2043541493697396334L;
public Tuple2<Integer, String> call(String t) throws Exception {
return new Tuple2<Integer, String>(1, t);
}
});
JavaPairRDD<Integer, String> resultRDD = pairRDD.mapValues(new Function<String, String>() {
private static final long serialVersionUID = 924538234523756151L;
public String call(String value) throws Exception {
return value.toUpperCase();
}
});
System.out.println(resultRDD.collect());
}
- reduceByKey(func:(V,V) => V):RDD[(K,V)]
- reduceByKey(func:(V,V) => V,numPartitions:Int):RDD[(K,V)]
- reduceByKey(p:Partitioner,func:(V,V) =>V):RDD[(K,V)]
- reduceByKeyLocally(func:(V,V) => V):Map[(K,V)]
reduceByKey:用于将RDD[K,V]中每一个K对应的V值根据映射函数进行计算,说白了就对相同的key的value进行reduce操作,内部其实调用的是combineByKey,numPartitions用于指定分区;reduceByKeyLocally将运算结果映射到一个Map中,而不是RDD。
public static void reduceByKeyTest(JavaSparkContext sc) {
List<String> strs = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD<String> strsRDD = sc.parallelize(strs);
JavaRDD<String> splitedRDD = strsRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = -984130321206766818L;
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
JavaPairRDD<String, Integer> pairRDD = splitedRDD
.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 2043541493697396334L;
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
JavaPairRDD<String, Integer> resultRDD = pairRDD
.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 4852162726837426718L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(resultRDD.collect());
}
- groupByKey():RDD[(K,Iterable[V])]
- groupByKey(numPartitions:Int):RDD[(K,Iterable[V])]
- groupByKey(p:Partitioner):RDD[(K,Iterable[V])]
groupByKey:用于将RDD[K,V]中每一个K对应的V值合并到一个集合Iterable[V]中,也就是根据key进行分组。
public static void groupByKeyTest(JavaSparkContext sc) {
List<String> strs = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD<String> strsRDD = sc.parallelize(strs);
JavaRDD<String> splitedRDD = strsRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = -984130321206766818L;
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
JavaPairRDD<String, Integer> pairRDD = splitedRDD
.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 2043541493697396334L;
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
JavaPairRDD<String, Iterable<Integer>> resultRDD = pairRDD.groupByKey();
System.out.println(resultRDD.collect());
}
- cogroup 相当于SQL语句中全外关联,返回左右RDD中的记录,关联不上的为空。
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A",1),("B",2),("C",3)),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("D","d")),2)
val rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2)
val rdd4 = rdd1.cogroup(rdd2,rdd3).collect
/**
* Array[(String, (Iterable[Int],Iterable[String], Iterable[String]))]
* =Array((B,(CompactBuffer(2),CompactBuffer(b),CompactBuffer())),
*(D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())),
* (A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))),
*(C,(CompactBuffer(3),CompactBuffer(),CompactBuffer())),
*(E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))
*/
- join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
- join[W](other: RDD[(K, W)], numPartitions: Int):RDD[(K, (V, W))]
内连接,基于cogroup实现,将两个RDD之间相同的key进行连接,不同的抛弃掉。
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.join(rdd2).collect
//Array[(String, (String, String))] = Array((B,(2,b)), (A,(1,a)))
- leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V,Option[W]))]
- leftOuterJoin[W](other: RDD[(K, W)], numPartitions:Int): RDD[(K, (V, Option[W]))]
左外连接,基于cogroup实现,以左边RDD的key为准,进行连接,如果另一个RDD没有则为None。
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.leftOuterJoin(rdd2).collect
// Array[(String,(String, Option[String]))] = Array((B,(2,Some(b))), (A,(1,Some(a))),(C,(3,None)))
- rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K,(Option[V], W))]
- rightOuterJoin[W](other: RDD[(K, W)],numPartitions:Int): RDD[(K, (Option[V], W))]
右外连接,基于cogroup实现, 以右边RDD的key为准,进行连接,如果另一个RDD没有则为None。
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.rightOuterJoin(rdd2).collect
//Array[(String, (Option[String], String))] = Array((B,(Some(2),b)),(F,(None,f)), (A,(Some(1),a)))
- fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K,(Option[V], Option[W]))]
- fullOuterJoin[W](other: RDD[(K, W)],numPartitions:Int): RDD[(K, (Option[V], Option[W]))]
全连接,基于cogroup实现,两个RDD所有键值对都需要连接,如果另一方没有,则是None。
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.fullOuterJoin(rdd2).collect
//Array[(String, (Option[String], Option[String]))] =Array((B,(Some(2),Some(b))),
// (F,(None,Some(f))),(A,(Some(1),Some(a))), (C,(Some(3),None)))
- subtractByKey[W: ClassTag](other: RDD[(K, W)]):RDD[(K, V)]
- subtractByKey[W: ClassTag](other: RDD[(K,W)],numPartitions: Int): RDD[(K, V)]
返回第一个RDD和第二个RDD的差集,也就是第一个RDD在第二个RDD不存在的元素,比如{1,2,3,5}和{1,2,4} 由于1,2在第二个集合有,所以不反回,3和5在第二个集合没有所以返回。
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.subtractByKey(rdd2).collect
//Array[(String, String)] = Array((C,3))