val conf=new SparkConf().setAppName("ForeachDemo").setMaster("local") val sc=new SparkContext(conf) val rdd1 = sc.parallelize(List("hello tom","hello jerry","hello tom","hello world"), 2) //求word_count //第一种方法使用reduceByKey实现 println(rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().toBuffer) //ArrayBuffer((tom,2), (hello,4), (jerry,1), (world,1)) /** * 第二种方法使用combineByKey实现 * def combineByKey[C]( * createCombiner: V => C, * mergeValue: (C, V) => C, * mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { * combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) * } */ /** * combineByKey(x=>x,(a:Int,b:Int)=>(a+b),(c:Int,d:Int)=>(c+d)) * x表示每个分区的第一个初始值的操作 这里是1 x=>x表示不做任何处理 * (a:Int,b:Int) 这个是先进行局部操作,在每个分区内的value相加 * (c:Int,d:Int) 这个是总体的操作,在最后各个分区内相加 * */ val rdd2=rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x=>x,(a:Int,b:Int)=>(a+b),(c:Int,d:Int)=>(c+d)) println(rdd2.collect().toBuffer) //ArrayBuffer((tom,2), (hello,4), (jerry,1), (world,1)) val rdd3=rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x=>x+10,(a:Int,b:Int)=>(a+b),(c:Int,d:Int)=>(c+d)) println(rdd3.collect().toBuffer) //ArrayBuffer((tom,22), (hello,24), (jerry,11), (world,11)) val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val rdd6 = rdd5.zip(rdd4) //ArrayBuffer((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee)) println(rdd6.collect().toBuffer) /** * 将数量一样的放到一个集合中 * List(_) 会把第一个元素变为一个集合 * (x: List[String], y: String) => x :+ y 往集合中添加元素 * (m: List[String], n: List[String]) 集合之间做汇总操作 */ val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n) println(rdd7.collect().toBuffer) //ArrayBuffer((1,List(dog, cat, turkey)), (2,List(gnu, salmon, rabbit, wolf, bear, bee))) //重新分区这两个都是重新分区意思必须带true rdd7.repartition(3)==rdd7.coalesce(3,true) //collectAsMap 将结果已map的方式展示 val rdd = sc.parallelize(List(("a", 1), ("b", 2))) val rdd10=rdd.collectAsMap println(rdd10) val rdd11 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) //根据key计算key的数量 println(rdd11.countByKey)//Map(a -> 1, b -> 2, c -> 2)key指的是 a b c println(rdd11.countByValue)//Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1) //value指的是("a",1)整体 // ------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------------------------- // filterByRange过滤并给一个范围 val rdd12 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1))) val rdd13 = rdd12.filterByRange("b", "d") println(rdd13.collect().toBuffer)//ArrayBuffer((c,3), (d,4), (c,2))不包含开头包含结尾 // ------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------------------------- // flatMapValues处理的是values 最后键和值分别对应 val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4"))) println(a.flatMapValues(_.split(" ")).collect().toBuffer)//ArrayBuffer((a,1), (a,2), (b,3), (b,4)) // ------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------------------------- //foldByKey类似reduceByKey给一个初始值 val rdd14 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2) val rdd15 = rdd14.map(x => (x.length, x)) println(rdd15.foldByKey("")(_+_).collect().toBuffer)//ArrayBuffer((4,wolfbear), (3,dogcat)) // ------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------------------------- // foreachPartition循环操作每个分区不会产生新的rdd val rdd16 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) rdd16.foreachPartition(x => println(x.reduce(_ + _))) // ------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------------------------- // keyBy构建一个元组,父rdd的每一个元素为值,参数的结果为key val rdd17 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) println(rdd17.keyBy(_.length).collect().toBuffer)//ArrayBuffer((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)) // ------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------------------------- // keys values val rdd18 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val rdd19 = rdd18.map(x => (x.length, x)) println(rdd19.keys.collect.toList)//List(3, 5, 4, 3, 7, 5) println(rdd19.values.collect.toList)//List(dog, tiger, lion, cat, panther, eagle)
spark01-算子练习03
猜你喜欢
转载自blog.csdn.net/oracle8090/article/details/78776620
今日推荐
周排行