union
intersection
distinct
mapToPair
groupByKey
reduceByKey
sortByKey
aggregateByKey
package com.paic.Spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import scala.collection.immutable.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Created by Xlucas on 2018/4/6.
*/
public class SparkDemo2 {
public static void main(String[] args){
SparkConf conf= new SparkConf();
conf.setAppName("transformation2");
conf.setMaster("local[2]");
JavaSparkContext sc=new JavaSparkContext(conf);
sc.setLogLevel("ERROR");
JavaRDD<String> rdd1=sc.parallelize(Arrays.asList("1","2","3","4"));
JavaRDD<String> rdd2=sc.parallelize(Arrays.asList("3","4","5","6"));
//union 将2个RDD的元素合并成一个RDD,
rdd1.union(rdd2).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.print(s+" ");
}
});
System.out.println();
//intersection 操作是取2个RDD中的交集
rdd1.intersection(rdd2).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.print(s+" ");
}
});
System.out.println();
// distinct rdd中元素去重,可以带一个参数numtasks task的个数
rdd1.union(rdd2).distinct().foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.print(s+" ");
}
});
System.out.println();
//groupByKey
//在一个由(K,v)对组成的数据集上调用,返回一个(K,Seq[V])对组成的数据集。
// 默认情况下,输出结果的并行度依赖于父RDD的分区数目,
// 如果想要对key进行聚合的话,使用reduceByKey或者aggregateByKey会有更好的性能
//mapToPair
//将一个对象转换成一个key value 键-值对的模式
System.out.println(rdd1.union(rdd2).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
}).groupByKey().collect());
//reduceByKey
//在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,
// key相同的值,都被使用指定的reduce函数聚合到一起,reduce任务的个数是可以通过第二个可选参数来配置的
//这个与 groupByKey的区别有
//1、reduceByKey 根据key进行了一次汇总,而groupByKey的value是一个seq
//2、reduceByKey 参数可以带上函数,而groupBykey 不可以带上函数
//3、reduceByKey可以用第二个参数控制reduce的个数
System.out.println(rdd1.union(rdd2).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
}
}).collect());
//sortByKey 模式是按照升序排的,如果要修改排序方式可以将第一个参数修改成false 那么为降序排序
System.out.println(rdd1.union(rdd2).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}}).sortByKey(false).collect());
//aggregateByKey 类似于reduceByKey 的功能
List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<>(1, "www"));
list.add(new Tuple2<>(1, "iteblog"));
list.add(new Tuple2<>(1, "com"));
list.add(new Tuple2<>(2, "bbs"));
list.add(new Tuple2<>(2, "iteblog"));
list.add(new Tuple2<>(2, "com"));
list.add(new Tuple2<>(3, "good"));
JavaPairRDD<Integer, String> data = sc.parallelizePairs(list);
//zeroValue: U,初始值,比如空列表{} ;
//seqOp: (U,T)=> U,seq操作符,描述如何将T合并入U,比如如何将item合并到列表 ;
//combOp: (U,U) =>U,comb操作符,描述如果合并两个U,比如合并两个列表 ;
System.out.println(data.aggregateByKey(new ArrayList<String>(), new Function2<List<String>, String, List<String>>() {
@Override
public List<String> call(List<String> str, String v) throws Exception {
str.add(v);
return str;
}
}, new Function2<List<String>, List<String>, List<String>>() {
@Override
public List<String> call(List<String> str1, List<String> str2) throws Exception {
return str2;
}
}).collect());//[(2,[bbs, iteblog, com]), (1,[www, iteblog, com]), (3,[good])]
}
}