spark中cogroup用法

spark中cogroup用法
cogroup:对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。

例一

[root@node111 ~]# spark-shell
28 一月 10:20:56 WARN [util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://node111:4040
Spark context available as 'sc' (master = spark://mycluster:7077, app id = app-20190128102110-0031).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.
//定义集合
scala> val DBName=Array(Tuple2(1,"Spark"),Tuple2(2,"Hadoop"),Tuple2(3,"Kylin"),Tuple2(4,"Flink"))
DBName: Array[(Int, String)] = Array((1,Spark), (2,Hadoop), (3,Kylin), (4,Flink))

scala> val numType=Array(Tuple2(1,"String"),Tuple2(2,"int"),Tuple2(3,"byte"),Tuple2(4,"bollean"),Tuple2(5,"float"),Tuple2(1,"34"),Tuple2(2,"45"),Tuple2(3,"75"))
numType: Array[(Int, String)] = Array((1,String), (2,int), (3,byte), (4,bollean), (5,float), (1,34), (2,45), (3,75))
//集合并行化为RDD
scala> val names=sc.parallelize(DBName)
names: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val types=sc.parallelize(numType)
types: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:26
//显示原始两个RDD中的数据内容
scala> names.collect.foreach(println)
(1,Spark)
(2,Hadoop)
(3,Kylin)
(4,Flink)

scala> types.collect.foreach(println)
(1,String)
(2,int)
(3,byte)
(4,bollean)
(5,float)
(1,34)
(2,45)
(3,75)

//cogroup用法
scala> val nameAndType=names.cogroup(types)
nameAndType: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[3] at cogroup at <console>:27
//显示集合内容
scala> nameAndType.collect.foreach(println)
(1,(CompactBuffer(Spark),CompactBuffer(34, String)))                            
(2,(CompactBuffer(Hadoop),CompactBuffer(45, int)))
(3,(CompactBuffer(Kylin),CompactBuffer(75, byte)))
(4,(CompactBuffer(Flink),CompactBuffer(bollean)))
(5,(CompactBuffer(),CompactBuffer(float)))

//过滤用法
scala> val commonRdd=nameAndType.filter(t=>t._2._1.iterator.hasNext && t._2._2.iterator.hasNext);
commonRdd: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[4] at filter at <console>:25
//再次显示过滤后集合内容
scala> commonRdd.collect.foreach(println)
(1,(CompactBuffer(Spark),CompactBuffer(String, 34)))
(2,(CompactBuffer(Hadoop),CompactBuffer(int, 45)))
(3,(CompactBuffer(Kylin),CompactBuffer(byte, 75)))
(4,(CompactBuffer(Flink),CompactBuffer(bollean)))

scala> 

例二
spark中关于cogroup的定义:

/**
   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
   * list of values for that key in `this` as well as `other`.
   */
  def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
  : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
    fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))

使用的例子

private JavaPairRDD<GeoWaveInputKey, ByteArrayId> joinAndCompareTiers(
			JavaPairRDD<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>> leftTier,
			JavaPairRDD<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>> rightTier,
			Broadcast<GeomFunction> geomPredicate,
			int highestPartitionCount, 
			HashPartitioner partitioner ) {
		// Cogroup groups on same tier ByteArrayId and pairs them into Iterable
		// sets.
		JavaPairRDD<
		     ByteArrayId, 
		     Tuple2<
		             Iterable<Tuple2<GeoWaveInputKey, Geometry>>, 
		             Iterable<Tuple2<GeoWaveInputKey, Geometry>>
		      >
		> joinedTiers = leftTier
				.cogroup(
						rightTier,
						partitioner);
		
		
		// Filter only the pairs that have data on both sides, bucket strategy
		// should have been accounted for by this point.
		// We need to go through the pairs and test each feature against each
		// other
		// End with a combined RDD for that tier.
		joinedTiers = joinedTiers.filter(t -> 
				t._2._1.iterator().hasNext() &&
				t._2._2.iterator().hasNext()
		);

		
		JavaPairRDD<GeoWaveInputKey, ByteArrayId> finalMatches = joinedTiers.flatMapValues(
		(Function<
		      Tuple2<
		            Iterable<Tuple2<GeoWaveInputKey, Geometry>>, 
		            Iterable<Tuple2<GeoWaveInputKey, Geometry>>
		      >, Iterable<GeoWaveInputKey>
		>) t -> {
            GeomFunction predicate = geomPredicate.value();

            HashSet<GeoWaveInputKey> results = Sets.newHashSet();
            for (Tuple2<GeoWaveInputKey, Geometry> leftTuple : t._1) {
                for (Tuple2<GeoWaveInputKey, Geometry> rightTuple : t._2) {
                    if (predicate.call(
                            leftTuple._2,
                            rightTuple._2)) {
                        results.add(leftTuple._1);
                        results.add(rightTuple._1);
                    }
                }
            }
            return results;
        }) .mapToPair(Tuple2::swap)
        .reduceByKey(partitioner,  (id1, id2) -> id1)
        .persist(StorageLevel.MEMORY_ONLY_SER());
		
		return finalMatches;
	}

猜你喜欢

转载自blog.csdn.net/hsg77/article/details/86673202