Spark-Core源码学习记录
该系列作为Spark源码回顾学习的记录,旨在捋清Spark分发程序运行的机制和流程,对部分关键源码进行追踪,争取做到知其所以然,对枝节部分源码仅进行文字说明,不深入下钻,避免混淆主干内容。
前面篇章中,我们完成了Master
与Worker
的注册启动,Driver
和Executor
的注册启动,Application
的注册与启动。初始化了SparkContext、SchedulerBackend、TaskScheduler
,最终通过schedule()
方法完成硬件资源的分配。万事俱备,只欠东风,应用如何被划分成Stage
,以及Stage
如何分发成具体的Task
给Executor
执行?下面我们就进入JavaWordCount
应用程序的末尾output = counts.collect();
从一个Action算子开始
count方法内部调用SparkContext的runJob方法,我们省略掉内部的多次周转,直接展现最终的调用,
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
// ... 省略runJob内部各种调用,下面是最终的调用
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
* 这是一个所有action算子的主要入口
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 原来最终是靠 dagScheduler完成,这才是我们下面要关注的重点
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
// checkpoint,为了RDD的复用
rdd.doCheckpoint()
}
查看DagScheduler
的runJob方法:
def runJob[T, U](...): Unit = {
// Submit an action job to the scheduler.
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
// Preferred alternative to `Await.ready()`
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
}
进入submitJob方法
/**
* Submit an action job to the scheduler.
* @return a JobWaiter object that can be used to block until the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](...): JobWaiter[U] = {
val jobId = nextJobId.getAndIncrement()
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 实例化一个 JobWaiter,内部是一些状态记录的成员
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// 还记的前面实例化 DAGScheduler的时候,提及的 eventProcessLoop,类似于Rpc中的 Dispatcher,通过一个循环线程来处理一个队列 eventQueue中的消息
// 此处post就是往 eventQueue中放入一个模板类 JobSubmitted,等待循环线程来处理就可以
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
我们回想一下eventProcessLoop的逻辑,就是循环从eventQueue中取出具体事件,然后调用doOnReceive(event)
进行处理,具体细节可回顾前篇文章。
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
即将进入handleJobSubmitted方法,接下来的内容均非常重要,包括Stage划分和Task分发等。
Stage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_], // 触发count算子的RDD
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 这里开始划分Stage,内容非常多,我们在下面单独展开
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {...}
// 简单的封装,其中包含了上面的 finalStage
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
// 清除被持久化的RDD分区的位置
clearCacheLocs()
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
// 状态记录
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 正式提交Stage
submitStage(finalStage)
}
下面分别对createResultStage和submitStage进行追踪:
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(...): ResultStage = {
// 获得ResultStage的父stage,内部循环嵌套,下面会展开
val parents = getOrCreateParentStages(rdd, jobId)
// 经过上面的方法,该RDD所有父辈都被划分为不同的Stages,下面就是对仅剩的这个RDD封装为 ResultStage
// 获取一个自增ID,实例化 ResultStage,内部有字段和当前jobId绑定。因此ResultStage和job是一一对应
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
// 完成上面所有的过程就将 ResultStage返回
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// 遍历获取当前RDD的父依赖,
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
// 返回值容器
val parents = new HashSet[ShuffleDependency[_, _, _]]
// 记录访问过的RDD
val visited = new HashSet[RDD[_]]
// 充当stack,用于递归调用
val waitingForVisit = new ArrayStack[RDD[_]]
// 从最后一个RDD开始,先压栈,再循环内部出栈
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
// 当前RDD的 dependencies,将 ShuffleDependency加入结果容器,将窄依赖对应的父RDD入栈,等待访问
// dependencies是在每一个RDD实例化时被初始化,记录所有的父依赖关系 A list of dependencies on other RDDs
// Dependency是一个虚类,它的两个实现为 NarrowDependency ShuffleDependency
// Dependency的RDD成员其实是父RDD,或者说,当前RDD把自身封装到Dependency,传递给子RDD
// 所以不要把 Dependency和 RDD搞混
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
经过上面的递归调用,从最后的RDD开始向前遍历,“所有”的宽依赖都被加入到parents容器返回。
当然这里其实并不是所有,因为根据逻辑来看,遍历到宽依赖后,当前父分支就不在往前寻找宽依赖了,简单来说A<==B<==C
,那么C寻找到BC之间的宽依赖就结束了,AB之间的宽依赖并没有被记录到。至于AB之间的宽依赖,不急,下面马上就会出现。
不过下面我们还是只需关注这个返回容器就够了,因为Stage的划分是以宽依赖为界。然后返回getOrCreateShuffleMapStage方法
{
getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
// shuffleIdToMapStage维护所有的Stage和shuffleId,重要
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
// 匹配的到,证明这个ShuffleDependency之前已经被构建在某个Stage中
case Some(stage) =>
stage
// 这里才是新建
case None =>
// Create stages for all missing ancestor shuffle dependencies.
// 分两部分进行,首先是对当前 ShuffleDependency所有父类往上的划分Stage。
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep
=> if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
// 然后是把自身加入Stage中,其实这就是最后一个Stage
createShuffleMapStage(shuffleDep, firstJobId)
}
}
这里先关注 getMissingAncestorShuffleDependencies方法,然后再关注内部的 createShuffleMapStage方法:
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// 类似上面寻找 ShuffleDependency的过程,用栈来完成递归,我们直接去关注结果部分
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
// 注意,这里开始遍历上面提到B前面的宽依赖
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
// 此时是直接把宽依赖的RDD加入待遍历容器,这样整个循环下来,才是所有的宽依赖都被遍历
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
ancestors
}
简单来理解,getShuffleDependencies就是根据传入的RDD
,向前查找一层宽依赖,然后getMissingAncestorShuffleDependencies内部再通过每个宽依赖,继续调用getShuffleDependencies又往前走了一层,整个过程下来,所有的宽依赖都被记录在ancestors
容器中,这才是完全体。
现在改返回到createShuffleMapStage(dep, firstJobId)
,其实就是对所有的宽依赖调用该方法,进入方法内部:
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
// 内部又是一个往前递归的调用,可以理解为,想给当前 ShuffleDependency划分Stage,得先保证它前面的宽依赖都被划分过了,也就是说StageId至少是从前往后递增的
val parents = getOrCreateParentStages(rdd, jobId)
// 对每个宽依赖新生成一个ID,所以说Stage是以宽依赖为划分依据的
val id = nextStageId.getAndIncrement()
// 实例化 ShuffleMapStage,内部记录的有jobid
// 同时,每个Stage都包含所有的父Stage
// 最重要的是内部维护一个 pendingPartitions,记录还未被计算过的分区列表,下面会用到
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
// 维护容器信息
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
// 把Shuffle信息注册到Driver的MapOutputTrackerMaster上
// 内部 shuffleStatuses用于维护shuffleId和ShuffleStatus信息
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
现在可以回到handleJobSubmitted中,执行最后的submitStage(finalStage)
方法:
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
...// 省略一些判断以及父Stage是否初始化的问题
submitMissingTasks(stage, jobId.get)
...
}
private def submitMissingTasks(stage: Stage, jobId: Int) {
// Stage的实现类 ResultStage重写该方法,(0 until job.numPartitions)
// 实现类 ShuffleMapStage也重写该方法,内部是通过 mapOutputTrackerMaster获取pendingPartitions分区列表
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
//遍历每个partitionId,根据Id和这个stage的RDD调用Task最佳位置划分算法
// getPreferredLocs首先会通过 getCacheLocs查询BlockManager是否持久化过,若有就去Driver端找BlockManagerMaster获取地址
// 否则就会去查找是否checkpoint过,若有就可能会去hdfs直接获取
// 若都没持久化过,就会去找MapOutputTracker查找之前在map端写入的shuffle文件的地址
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {...}
// 更新latestInfo等信息
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
// 序列化为二进制广播变量
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
// 超容量判断
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
// 广播出去
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {...}
val tasks: Seq[Task[_]] = try {
//指标检测对象序列化
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
// 遍历带计算的分区列表
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
// 每个分区对应一个ShuffleMapTask
val part = partitions(id)
stage.pendingPartitions += id
// 实例化ShuffleMapTask,包含重写的runTask方法
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
// 实例化ResultTask,包含重写的runTask方法
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {...}
if (tasks.size > 0) {
//调用 taskScheduler的 submitTasks方法,参数为简单的封装类实例 TaskSet,注意是将一个Stage下全部tasks作为数组封装进一个TaskSet
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {...}
}
经过上面的步骤,每个宽依赖都会被封装成一个Stage,然后每个Stage中不同分区被封装成多个Task,在最后每个Stage的Task都分别被封装成一个TaskSet,提交给taskScheduler
的实现类TaskSchedulerImpl
。下面就进入submitTasks方法。
Task
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
this.synchronized {
// 内部实例化 TaskSetManager,成员包括 schedulableQueue,记录StageId以及Tasks
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
// 记录进 taskSetsByStageIdAndAttempt中
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
//将 manager加入到调度器中,还记得前面文章中提到的FIFO调度器
// 还记得前面介绍到FIFO时发现rootpool什么也没干,具体怎么调度下面即将揭晓
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 一个定时线程用于检查Task是否启动
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {...}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
// 这里backend根据之前的篇章,是一个CoarseGrainedSchedulerBackend对象,下面展开
backend.reviveOffers()
}
override def reviveOffers() {
// 直接给driver发送消息
driverEndpoint.send(ReviveOffers)
}
case ReviveOffers =>
makeOffers()
下面看一看makeOffers方法:
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
// 将可用的Executor的内存cpu等信息封装成模板类 WorkerOffer,包括 executorHost地址信息
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
// 对封装好的 workOffers开始调度合理分配Task,然后返回具体的 taskDescs
// 此方法非常重要,下面单独展开
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
// 终于要启动Task了,也放在后面单独展开
launchTasks(taskDescs)
}
}
现在分别对两个重要方法resourceOffers
和launchTasks
进行追踪:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) { // 遍历传入的WorkerOffer
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {//当前executor是否被分配task
// 当前executor即将被使用,维护executor、host、taskid关系
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
// executor可能运行多个task
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
// 维护主机和机架的关系
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// 打乱顺序,尽量分散
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// 每个worker根据其cpu核数,计算最多可分配的task个数
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 所有worker加起来最多task个数
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
// rootpoll终于开始干活了,显示之前放入的TaskSet排序
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
//有新的executor加入,需要重新计算每个task的本地性等级,内部调用TaskSetManager本地级别的分配算法,下面详细展开
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {...} else {
var launchedAnyTask = false //两个标志位
// Record all the executor IDs assigned barrier tasks on.
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
for (currentMaxLocality <- taskSet.myLocalityLevels) {// 本地性等级在上面计算
var launchedTaskAtCurrentMaxLocality = false // 两个标志位
do {
// resourceOfferSingleTaskSet内部遍历每个worker,再taskset中根据本地性等级,
// 分配一个最合适的task,封装成TaskDescription,同时维护Executor的剩余资源信息
//下面会展开详细
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
// 直到无需分配,内部出错,才返回false
} while (launchedTaskAtCurrentMaxLocality)
}
...
}
}
// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
}
// tasks是一个二维数组,Seq[Seq[TaskDescription]]
return tasks
}
先来关注taskSet.executorAdded()
方法,
def executorAdded() {
recomputeLocality()
}
def recomputeLocality() {
// myLocalityLevels是一个函数对象,其实就是computeValidLocalityLevels(currentLocalityIndex)
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
}
// 计算本地性等级
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
// 在初始化TaskSetManager的时候,会把需要执行的所有task加入到pendingTasksForExecutor中
// 就是判断Taskset中的executor是否包含在executorIdToRunningTaskIds中,能对应上就说明在同一JVM中有可用的Executor
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
...// 其他等级判断类似
}
可以看出,关键在于pendingTasksForExecutor的内容,下面就来看看这个容器是如何被初始化的:
private[spark] class TaskSetManager(...){
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
val tasks = taskSet.tasks
// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
// 其实调用该方法来初始化 pendingTasksForExecutor
addPendingTask(i)
}
/** Add a task to all the pending-task lists that it should be on. */
private[spark] def addPendingTask(index: Int) {
// preferredLocations的默认值是Nil,往回看看taskSet的初始化是否赋值,下面展开
for (loc <- tasks(index).preferredLocations) {
// 根据得到的值,维护 pendingTasksForExecutor JVM级别
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
// 维护 pendingTasksForHost主机级别
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
// 维护 pendingTasksForRack机架级别
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
// 这里记录是所有的需要执行的task
allPendingTasks += index // No point scanning this whole list to find the old task there
}
preferredLocations方法内部涉及东西错综复杂,目前先理解其大概意思即可。暂时跳过,留作后续补充。此处放一节其他博文的解释:
DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化
task的最佳位置计算是怎么实现的,事实上调用了getPreferredLocs的getPreferredLocsInternal方法
1.判断rdd的partition是否被访问过,如果被访问过,则什么都不做
2.然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回
3.如果没有cache,则调用rdd.getPreferedLocations方法,获取RDD partition的最佳位置
4.遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法,即从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列。
5.如果该stage中rdd,从最后一个rdd,到最开始的rdd的partition都没有被cache或者checkpoint,那么,task的最佳位置为Nil,即没有最佳位置
现在返回到resourceOffers中,resourceOfferSingleTaskSet方法内部:
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
var launchedTask = false
//遍历executor
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
// resourceOffer内部封装TaskDescription,下面展开
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
// 维护一下各类信息,tasks里已经是 TaskDescription
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
}
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
来关注其中的resourceOffer方法
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] ={
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
// 拿到当前最优的本地性等级,越小越好
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
// 遍历executor本地化级别匹配的所有task
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// 维护信息,生成 TaskInfo模板类
val task = tasks(index)
val taskId = sched.newTaskId()
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Serialize and return the task,省略
// 加入 runningTasksSet
addRunningTask(taskId)
// 内部 eventProcessLoop.post(BeginEvent(task, taskInfo))
// 自动处理 BeginEvent事件,不过这个事件也只是加了一下监听内容,不重要
sched.dagScheduler.taskStarted(task, info)
// 最终将所有信息封装为 TaskDescription返回
// 内容包括task、host、executor信息
new TaskDescription(taskId,attemptNum,execId,taskName,index,task.partitionId,addedFiles,addedJars,task.localProperties,serializedTask)
}
} else {
None
}
}
现在终于可以回到makeOffers内部的launchTasks方法:
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
// 序列化超限
if (serializedTask.limit() >= maxRpcMessageSize) {...}
else {
// 维护对应的executor信息
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//向executor发送 LaunchTask消息
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
case LaunchTask(data) =>
// 反序列化传输过来的TaskDescription
val taskDesc = TaskDescription.decode(data.value)
// 开始在executor自己进程中启动task
executor.launchTask(this, taskDesc)
}
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 这里会把TaskDescription和ExecutorBackend(默认是:CoarseGrainedExecutorBackend)
// 封装成继承Runnable的TaskRunner
val tr = new TaskRunner(context, taskDescription)
// 放入负责维护所有正在此executor上运行的task的ConcurrentHashMap中
runningTasks.put(taskDescription.taskId, tr)
// 执行TaskRunner
threadPool.execute(tr)
}
参考: