文章目录
RDD转换算子 之 聚合算子
- 聚合算子可以说是Spark计算里面的核心,所以搞懂底层的实现很有必要。
reduceByKey
说明
- 可以将数据按照相同的key对value进行聚合
/**
* 使用一个关联与交换的reduce函数来合并每个key的values值。
* 在将结果发送到reducer之前,这也将在每个mapper上本地执行合并,类似于MapReduce中的combiner。
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
// 不会对第一个value进行处理,分区内和分区间计算规则相同
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
/**
* 输出将使用numPartitions分区,进行哈希分区。
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
/**
* 输出将使用现有的分区器或是并行级别来进行哈希分区。
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
案例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("partition by")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1), ("b", 2),
("c", 3), ("d", 4),
("a", 5), ("b", 6)
)
)
// 使用现有的分区器或是并行级别来进行哈希分区
val rdd1: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
println(rdd1.collect().mkString(", "))
// 使用numPartitions分区,进行哈希分区
val rdd2: RDD[(String, Int)] = rdd.reduceByKey(_ + _, 2)
println(rdd2.collect().mkString(", "))
// 使用指定分区器进行分区
val rdd3: RDD[(String, Int)] = rdd.reduceByKey(new HashPartitioner(2), _ + _)
println(rdd3.collect().mkString(", "))
}
reduceByKey进一步调用:
reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
-- combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
查看combineByKeyWithClassTag方法:
/**
* 泛型函数,使用一组自定义的聚合函数,来组合每个key的元素。将RDD[(K, V)]转换为RDD[(K, C)]类型的结果,得到一个“组合类型”C。
*
* 用户提供三个函数:
* - `createCombiner`, 将V转换为C(比如,创建只有一个元素的list)// 将计算的第一个值转换结构
* - `mergeValue`, 将V合并成C(比如,将它添加到list的末尾)// 分区内的计算规则
* - `mergeCombiners`, 将两个C组合成一个 // 分区间的计算规则
*
* 此外,用户可以控制输出RDD的分区,以及是否执行map端的聚合(如果一个mapper可以使用同一个键生成多个项)
*
* 注意:V和C可以不同 -- 例如,可以将类型(Int, Int)的RDD分组为类型(Int, Seq[Int])的RDD
*/
def combineByKeyWithClassTag[C](
createCombiner: V => C, // 将计算的第一个值转换结构
mergeValue: (C, V) => C, // 分区内的计算规则
mergeCombiners: (C, C) => C, // 分区间的计算规则
partitioner: Partitioner, // 控制输出RDD的分区
mapSideCombine: Boolean = true, // 是否执行map端的聚合
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner) // 后续有专门文章来讲解shuffle机制
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
groupByKey
说明
- 将分区的数据直接转换为相同类型的内存数组进行后续处理
- 相同的key的value值位于同一个分区,同一个分区中包含1~多个key
- reduceByKey和groupByKey的区别
- reduceByKey:存在预聚合,性能较高
- groupByKey:必须能够在内存中保存任何key的所有key-value对。如果一个key的value值太多,则可能导致“OutOfMemoryError”。
/**
* 将RDD中每个key的values值分组为一个序列。
* 哈希分区用现有的分区器或并行级别对结果RDD进行分区。
* 不保证每个组中元素的顺序,甚至在每次计算结果RDD时可能会有所不同。
*
* 注意:这个操作可能会很昂贵。如果为了对每个key执行一次聚合(如sum或average)而进行分组,
* 请使用`PairRDDFunctions.aggregateByKey`或`PairRDDFunctions.reduceByKey`将提供更好的性能。
*/
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
/**
* 允许通过传递分区器来控制结果key-value对RDD的分区。
*
* 注意:正如当前实现的那样,groupByKey必须能够在内存中保存任何key的所有key-value对。
* 如果一个key的value值太多,则可能导致“OutOfMemoryError”
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey不应该使用map-side combine,因为map-side combine不会减少被洗牌的数据量,
// 并且要求将所有map-side数据插入哈希表,从而在老年代中产生更多的对象。
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/**
* 哈希分区将结果RDD分到“numPartitions”个分区中。
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}
案例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("partition by")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1), ("b", 2),
("c", 3), ("d", 4),
("a", 5), ("b", 6)
)
)
// 哈希分区用现有的分区器或并行级别对结果RDD进行分区。
val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
println(rdd1.collect().mkString(", "))
// 哈希分区将结果RDD分到“numPartitions”个分区中。
val rdd2: RDD[(String, Iterable[Int])] = rdd.groupByKey(2)
println(rdd2.collect().mkString(", "))
// 允许通过传递分区器来控制结果key-value对RDD的分区。
val rdd3: RDD[(String, Iterable[Int])] = rdd.groupByKey(new HashPartitioner(2))
println(rdd3.collect().mkString(", "))
}
aggregateByKey
说明
- 可以实现分区内计算和分区间计算使用不同的规则
/**
* 使用给定的combine函数和中性"zero value"聚合每个key的value值。
* 此函数可以返回与此RDD V中的value类型不同的结果类型U。
* 因此,我们需要一个操作来将V合并为U,一个操作来合并两个U,就像在scala.TraversableOnce中一样。
* 前者用于合并分区内的值,后者用于合并分区间的值。
* 为了避免内存分配,允许这两个函数修改并返回它们的第一个参数,而不是创建一个新的U。
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// 将zero value序列化为字节数组,以便我们可以在每个key上获得它的新克隆
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// 我们稍后将在'combineByKey'中清理combiner闭包
val cleanedSeqOp = self.context.clean(seqOp)
// 将初始值和第一个value使用分区内计算规则
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner) // 默认mapSideCombine: Boolean = true,
}
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
案例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("partition by")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1), ("b", 2),
("c", 3), ("d", 4),
("a", 5), ("b", 6)
), 2
)
// todo 将数据根据不同的规则进行分区内计算和分区间计算
// 取出每个分区内相同key的最大值,然后分区间相加
val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_,_), _+_)
println(rdd1.collect().mkString(", "))
val rdd2: RDD[(String, Int)] = rdd.aggregateByKey(0, 2)(math.max(_,_), _+_)
println(rdd2.collect().mkString(", "))
val rdd3: RDD[(String, Int)] = rdd.aggregateByKey(0, new HashPartitioner(2))(math.max(_,_), _+_)
println(rdd3.collect().mkString(", "))
}
foldByKey
说明
- 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
/**
* 使用一个关联函数和一个中性的“zero value”合并每个key的values值,该“zero value”可以添加到结果中任意次数,并且不能更改结果。
* “zero value”举例:Nil表示列表连接,0表示加法,1表示乘法
*/
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// 将“zero value”序列化为byte数组,以便我们可以在每个key上获得它的新克隆
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// 反序列化时,使用lazy val为每个任务仅创建一个序列化实例
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}
案例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("partition by")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1), ("b", 2),
("c", 3), ("d", 4),
("a", 5), ("b", 6)
), 2
)
rdd.foldByKey(0)(_+_)
rdd.foldByKey(0, 2)(_+_)
rdd.foldByKey(0, new HashPartitioner(2))(_+_)
}
combineByKey
说明
- 最通用的对key-value型RDD进行聚合操作的聚合函数(aggregation function)。
- 类似于aggregateByKey(),combineByKey允许用户返回值的类型与输入不一致。
/**
* 泛型函数,使用一组自定义聚合函数组合每个key的元素。此方法用于向后兼容。它不向shuffle提供combiner类标记信息。
* 请参见 `combineByKeyWithClassTag`
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
/**
* combineByKeyWithClassTag的简化版本,它对输出RDD进行哈希分区
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}
/**
* 它使用现有的分区器/并行级别对结果RDD进行哈希分区。
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
案例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("partition by")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),
2
)
// 求每个key的平均值
val combineRDD: RDD[(String, (Int, Int))] = rdd.combineByKey((_, 1), {
case ((num1: Int, cnt: Int), num2: Int) => (num1 + num2, cnt + 1)
}, {
case ((num1: Int, cnt1: Int), (num2: Int, cnt2: Int)) => (num1 + num2, cnt1 + cnt2)
})
val resRDD: RDD[(String, Int)] = combineRDD.map {
case (key, (num, cnt)) => (key, num / cnt)
}
println(resRDD.collect().mkString(", "))
}
聚合算子小结
- 底层都调用了combineByKeyWithClassTag
def combineByKeyWithClassTag[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
有预聚合
- mapSideCombine = true,进行预聚合
- 从源码角度来讲,四个算子的底层逻辑相同。
reduceByKey
函数签名:
- def reduceByKey(func: (V, V) => V): RDD[(K, V)]
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
- def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
不会对第一个value进行处理,分区内和分区间计算规则相同
aggregateByKey
函数签名:
- def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
- def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
- def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
会将初始值和第一个value使用分区内计算规则进行计算
foldByKey
函数签名:
- def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
- def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
- def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
分区内和分区间计算规则相同,初始值和第一个value使用分区内计算规则
combineByKey
函数签名:
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
第一个参数就是对第一个value进行处理,所以无需初始值
无预聚合
- mapSideCombine = false,不进行预聚合
groupByKey
函数签名:
- def groupByKey(): RDD[(K, Iterable[V])]
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]