大家都只知道srotByKey是一个transformation算子,而transformation类型的算子是不触发Job的,但是有心的人会注意到,在我们调用sortByKey这个算子时 ,可以从UI界面看到,sortByKey竟然会触发Job
其实sortByKey 在执行时会对分区中的数据进行取样,把取样的数据再进行收集(collect),那么此时就会触发一个Job,具体的原因可以往下看源码
以下是源码分析
如果又哪里写错了欢迎纠正
参考文章
https://blog.csdn.net/u014393917/article/details/50602047
可以先大概的跟一边代码
(1)调用sortByKey时,会new一个RangePartitioner类
(2)点进RangePartitioner中,会初始化一个rangeBounds的数组
再里面调用了sketch方法(在类中,在方法外,相当于是构造方法中的)
(3)点进sketch方法中,这里调用了一个collect方法,点进这个collect方法中,可以发现是rdd的方法,会调用runJob
上面是大概看了一遍,有兴趣的可以再向下看源码的分析
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
// new 了一个RangePartitioner, 传入分区数,
//调用方法的rdd, 排序规则,
//点入RangePartitiner方法中
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
这里直接取了一部分代码,是在new RangePartitioner 初始化时会执行的
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
//初始化一个rangeBounds, 数组类型
private var rangeBounds: Array[K] = {
if (partitions <= 1) { //如果分区的数小于等于1, 则创建一个空数组
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
//这是我们需要大致平衡输出分区的样本量,上限为1M。
//一个大约的分区的样本量,最多不超过1e6(1000000)个大约1M,
// 默认是分区个数的20倍.如果这个分区太多时,只取1e6的个数.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
//假设输入分区大体上是平衡的,并且有点过采样。
//对样本数* 3/ 分区数 ,去天花板,再取证,得到的是每个分区取样的数
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
//在每个分区中去 sampleSizePerpartition 个样, 注意sketch方法中使用了collect方法,会触发一个job
//这里可以直接看下一部分代码,看sketch方法,再下面
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
/*这里返回的sketched方法是一个数组, 数组的长度是rdd的partitions的个数,
数组中每一个元素是一个Iterator(partitionid,这个partition中总的数据条数,Array[key](
长度是样本个数,或者一个小于样本个数的值(这种情况表示partition的数据不够样本个数))),
*/
if (numItems == 0L) { //如果获取的样本数==0
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
//如果一个分区包含的项目数量远远超过平均数量,那么我们将重新对其进行采样
//sampleSize是上面定义的样本数量 通过取样的个数除上总的数据的条数,得到一个分数值.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
//这个candidates中存储有用于计算排序的key的候选人信息
val candidates = ArrayBuffer.empty[(K, Float)]
//存储超过平均值太多的分区
val imbalancedPartitions = mutable.Set.empty[Int]
//对调用sktched方法后返回的样本进行foreach
sketched.foreach { case (idx, n, sample) =>
//分值 * 每个分区样本个数 如果 大于 设定的sampleSizePerPartition 值
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx //添加到imbalancedPartitions,需要重新取样
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {//如果需要重新抽样的集合不为空
// Re-sample imbalanced partitions with the desired sampling probability.
//据需要重新进行取样的partition生成一个PartitionPruningRDD实例.
// 这个实例中只计算需要进行重新取样的partition.传入参数中的imbalancedPartitions.contains用于过滤partition
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
//重新抽样,这里的seed类似于种子取随机数的感觉
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
sketch方法:
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
//使用mapPartitionsWtithIndex方法,获取分区号和每个元素
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => // idx ,分区号, iter每个分区中的数据是一个迭代器
//
val seed = byteswap32(idx ^ (shift << 16))
//reservoirSampleAndCount 还返回输入大小的存储库抽样实现, 返回一个样本(数组类型),和输入的长度
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed) //意思大概就是从每个分区中取一些样本
Iterator((idx, n, sample)) //迭代器中是 分区号,每个分区的样本数,样本(数组类型)
}.collect() //先收集每个分区的样本的,collect方法会产生一个job ,这里是把取到的样本后得到的rdd进行收集,点进collect方法会发现有一个runJob
val numItems = sketched.map(_._2).sum // 这个numItems应该是总共的样本数
(numItems, sketched) //最后把样本数,一个收集好的样本Array[key]类型返回
}