这一篇是一些简单的Spark操作,如去重、合并、取交集等,不管用不用的上,做个档案记录。
distinct去重
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; /** * 去除重复的元素,不过此方法涉及到混洗,操作开销很大 * @author wuweifeng wrote on 2018/4/16. */ public class TestDistinct { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Integer> data = Arrays.asList(1, 1, 2, 3, 4, 5); JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data); List<Integer> results = originRDD.distinct().collect(); System.out.println(results); } }
结果是[4, 1, 3, 5, 2]
union合并,不去重
这个就是简单的将两个RDD合并到一起
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; /** * 合并两个RDD * @author wuweifeng wrote on 2018/4/16. */ public class TestUnion { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Integer> one = Arrays.asList(1, 2, 3, 4, 5); List<Integer> two = Arrays.asList(1, 6, 7, 8, 9); JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one); JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two); List<Integer> results = oneRDD.union(twoRDD).collect(); System.out.println(results); } }
结果是[1, 2, 3, 4, 5, 1, 6, 7, 8, 9]
intersection取交集
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; /** * 返回两个RDD的交集 * @author wuweifeng wrote on 2018/4/16. */ public class TestIntersection { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Integer> one = Arrays.asList(1, 2, 3, 4, 5); List<Integer> two = Arrays.asList(1, 6, 7, 8, 9); JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one); JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two); List<Integer> results = oneRDD.intersection(twoRDD).collect(); System.out.println(results); } }
结果[1]
subtract
RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; /** * @author wuweifeng wrote on 2018/4/16. */ public class TestSubtract { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Integer> one = Arrays.asList(1, 2, 3, 4, 5); List<Integer> two = Arrays.asList(1, 6, 7, 8, 9); JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one); JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two); List<Integer> results = oneRDD.subtract(twoRDD).collect(); System.out.println(results); } }
结果:[2, 3, 4, 5]
cartesian返回笛卡尔积
笛卡尔积就是两两组合的所有组合,这个的开销非常大,譬如A是["a","b","c"],B是["1","2","3"],那笛卡尔积就是 ( 1 a)( 1 b)( 1 c)( 2 a)( 2 b)( 2 c)( 3 a)( 3 b)( 3 c)import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.util.Arrays; import java.util.List; /** * 返回笛卡尔积,开销很大 * @author wuweifeng wrote on 2018/4/16. */ public class TestCartesian { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Integer> one = Arrays.asList(1, 2, 3); List<Integer> two = Arrays.asList(1, 4, 5); JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one); JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two); List<Tuple2<Integer, Integer>> results = oneRDD.cartesian(twoRDD).collect(); System.out.println(results); } }
注意,返回的是键值对
[(1,1), (1,4), (1,5), (2,1), (2,4), (2,5), (3,1), (3,4), (3,5)]