大家都知道,spark job的提交是触发了Action操作,现在我在RDD.scala中找到collect算子,在这下面是有一个runjob方法
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
然后继续进入runjob方法,发现是对封装(rdd,func,partitions)等参数的runjob的一连串调用,最后发现是dagScheduler调用了runjob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
继续点进runjob,接下来进入到SparkContext最核心的DAGScheduler,这里可以看到调用了一个submitJob方法,他是DAGScheduler的submitJob,它会提交一个Job任务,然后返回一个阻塞的线程等待Job完成
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//在这里会提交一个Job任务,然后会返回一个阻塞的线程等待Job执行完成
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
//下面是根据不同的Job任务执行情况打印不同的Log信息
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
然后进入到submitJob方法, eventProcessLoop是DAGSchedulerEventProcessLoop的一个实例
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查分区是否存在保证Task正常运行
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
//增加一个JobId作当前Job的标识
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// 如果没有Task任务,将立即返回JobWaiter
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
//为分区做个判断,确保分区大于0
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
//首先构造一个JobWaiter阻塞线程,等待Job完成,然后把结果提交给resultHandler
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//eventProcessLoop是DAGScheduler的事件队列
//因为可能集群运行着多个Job,而DAGScheduler默认是FIFO先进先出的资源调度
//这里传入的事件类型是JobSubmitted,而在eventProcessLoop会调用doOnReceive
//来匹配事件类型并执行对应的操作,最终会匹配到dagScheduler、handleJobSubmitted
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
进入到eventProcessLoop 里面new DAGSchedulerEventProcessLoop(this)专门用来接收Job和Stage的发来的消息
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
//专门用来接收Job和Stage的发来的消息
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
DAGSchedulerEventProcessLoop是个类,这个类里面调用了很关键的onRceive方法和doOnReceive方法 ,它会对里面的事件不断地循环调用,要处理Stage的划分、shuffleMapTask和ResultTask的划分,还有一些job任务的提交和类型
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
//在这里注意一下,它会进行很关键的操作
doOnReceive(event)
} finally {
timerContext.stop()
}
}
//然后通过模式匹配进行匹配哪个事件模型
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
//注意这里是Stage划分的精髓所在
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
进入handleJobSubmitted方法,先创建resultStage,它是真正开始处理Job划分Stage的事件,划分ShufflemapStage后,ShufflemapStage里面会划分ShufflemapTask,每个job由一个ResultStage和0+个ShufflemapShuffle组成,finalRDD和分区、jobId都会被传进finalStage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
//创建ResultStage 这里才是真正开始处理提交的job划分stage的时候
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
//每个job都是由1个ResultStage和0+个ShuffleMapStage组成
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
进入createResultStage方法,创建ResultStage的父Stage,将父stage的参数放入到new ResultStage()中,生成ResultStage
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//开始创建ResultStage的父stage
//里面有多个嵌套获取shuffle依赖和循环创建shuffleMapStage,若没有shuffle
//操作则返回空list
val parents = getOrCreateParentStages(rdd, jobId)
//当前的stageId标识+1
val id = nextStageId.getAndIncrement()
//放入刚刚生成的父stage等核心参数,生成ResultStage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
//把ResultStage和它的ID加入stageIdToList
stageIdToStage(id) = stage
//更新jobIds和jobIdToStageIds
updateJobIdStageIdMaps(jobId, stage)
//返回这个ResultStage
stage
}
进入到创建父Stage的方法getOrCreateParentStages
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
//从getshuffleDependencies开始
//这里仅仅是抽取当前RDD的shuffle依赖
//(job的stage是以shuffle划分的,1个job中只会生成1个resultStage和0+个
// shuffleMapStage,如果不是shuffleDependency就继续抽取父RDD。。)
//迭代遍历一直到抽取出为止或者没有
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
进入getOrCreateShuffleMapStage方法中,进行匹配能不能取到ParentStage的值,当没有parentStage的时候会返回空,能取到就返回stage,ShuffleMapStage是根据遍历出的ShuffleDependencies一次次创建出来的
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
//通过从ShuffleDependency提取到的shuffleId来提取shuffleIdToMapStage中的
//shuffleMapStage
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
//如果能取到就直接返回
case Some(stage) =>
stage
//如果取不到就会依次找到所有父ShuffleDependencies并且构建
//所有父ShuffleMapStage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
//根据遍历出来的所有shuffleDependency依次创建所有父ShuffleMapStage
//接下来进行判断是否是父stage
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
//最后会创建当前ShuffleDependency的ShuffleMapStage
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
进入createShuffleMapStage方法 此方法是递归循环创建shuffleMapStage的过程
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
//ShuffleDependency的父RDD
val rdd = shuffleDep.rdd
//多少分区
val numTasks = rdd.partitions.length
//用父RDD循环调用,每次调用都是前一个父RDD
//在这里其实就会一直递归循环直到拿到首个stage才退出来
//最后把生成的ShuffleMapStage加入shuffleIdToMapStage以便后面直接从中拿取
val parents = getOrCreateParentStages(rdd, jobId)
//标记当前StageId nextStageId+1
val id = nextStageId.getAndIncrement()
//拿到之前的stage等核心参数后就可以构建ShuffleMapStage了
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
//把刚创建的ShuffleMapStage赋值给stageIdToStage
stageIdToStage(id) = stage
//赋值给shuffleIdToMapStage
//若后面的代码再次生成对应的ShuffleMapStage就可以从shuffleIdToMapStage
//中直接拿取了
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
//更新jobIds的jobIdToStageIds
updateJobIdStageIdMaps(jobId, stage)
//这里会把shuffle信息注册到Driver上的MapOutputTrackerMaster的
//shufflestatuses
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
//把shuffle信息注册到自己Driver的MapOutputTrackerMaster
//生成的是shuffleId和shuffleStatus的映射关系
//在后面提交Job的时候还会根据它来的map stage是否已经准备好
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
//这里创建好ShuffleMapStage后
//可以看到把Shuffle信息注册到自己Driver的MapOutputTrackerMaster
//的shuffleStatuses中,用来在后面的验证和reduce端拉取map输出
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
//最后返回生成的ShuffleMapStage
stage
这个时候ShuffleMapStage已经创建完成了,并不是一次就创建完成,而是遇见shuffle的时候会由下往上递归创建ShuffleMapStage
今天先分享到这里,有理解错误的地方欢迎批评指正 !