概述
本节介绍Spark Job对象的实现原理。
Job的基本概念
在《spark2原理分析-Job执行框架概述》一文中介绍了Job的执行框架的基本概念,本节详细介绍其中的Job对象的实现原理。
上面的文章已经介绍过了,Job是由于执行Action操作函数产生的。Job是提交给 DAGScheduler(Job调度器)来计算action操作结果的顶层工作单元(计算单元)。在Spark代码实现层面,Job的实现类是ActiveJob。Job可以有两种类型:
-
result Job(结果Job)
此类Job通过计算并创建一个ResultStage实体来执行action操作。 -
map-stage Job(映射阶段Job)
此类Job会在任何下游的Stage提交之前,为ShuffleMapStage计算map操作的输出。此类Job用于自适应查询计划,在提交后续阶段之前查看map操作的输出统计信息。
Spark使用ActiveJob类的finalStage字段来区分这两种类型的Job。
ActiveJob
该类代表一个在DAGScheduler运行的Job实体。该类的实现如下:
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}
/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)
var numFinished = 0
}
该类的实现代码很简洁,下面介绍一个该类的各个成员的意义:
成员名 | 说明 |
---|---|
jobId | 独一无二的Job的Id |
finalStage | 该Job计算的Stage,可以是:执行action的ResultStage;或submitMapStage的ShuffleMapStage。 |
callSite | CallSite类型的实体,用户应用程序在这里进行Job的初始化 |
listener | 监听Job结束或Job失败的事件 |
properties | 与Job调度相关的属性 |
numPartitions | 需要为该Job计算的分区数量 |
finished | 是一个Boolean的数组,用来表示在该Job中计算完成的分区 |
numFinished | 在该Job中已经计算完成的分区数量 |
Job的计算
计算Job等同于计算正在执行操作的RDD的分区(Partition)。Job中的分区数取决于阶段Stage的类型 - ResultStage或ShuffleMapStage。
作业Job以单个目标的RDD开始,但最终可以包括其他RDD,这些RDD都是目标RDD的血缘关系图中(lineage graph)的一部分。
父阶段是ShuffleMapStage的实例。
但要注意,在actions操作中并不总是为ResultStages计算所有分区,比如:first()和lookup()之类的操作。
Job会跟踪已经计算了多少分区,使用以下变量:
val finished = Array.fill[Boolean](numPartitions)(false)
Map-stage job
此类Job会在提交任何下游Stage之前,为ShuffleMapStage(对于submitMapStage)计算map的输出文件。
此类Job还用于自适应查询计划/自适应调度,以在提交后续阶段之前查看map输出的统计信息。
Result job
计算ResultStage,来执行action操作函数。
总结
本节介绍了Job的实现类,并对其实现原理进行了分析。