TaskSet其实是stage划分完成后,为不同类型stage中的每个分区创建一个task。stage中partition个数的tasks组成一个taskSet,用TaskScheduler来提交。
TaskScheduler提交的taskSet,用自己的CoarseGrainedSchedulerBackend按照不同的本地化级别分配Executor,Executor把task放入线程池去执行。
submitMissingTasks()
stage划分好以后,找到了祖先parent,就可以执行从头一个stage开始的所有task了。
RDD Action 触发sc.runJob -> DAGScheduler事件循环 -> submitStage() -> submitMissingTasks()
submitMissingTasks为stage创建tasks,从stage中获取numPartitions,为每一个partition创建一个task。
//为每一个partition/task 计算最佳位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
}
// 为每一个partition创建一个task,如果是finalStage,就创建ResultTask
// 否则,为每一个partition创建ShuffleMapTask。
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
partition的最佳位置
最佳位置含义是task尽量在本地结点上执行,而不必迁移数据。
- 首先,如果RDD的partition被cache了,就返回partition对应的cache的位置;
- 其次,如果RDD(input RDDs)已经有preferredLocations(其实是RDD是否checkpointed),就返回那些位置。
- 如果RDD有窄依赖,就递归地查看窄依赖的父RDD的partition,看是否被cache/checkpointed。
- 找不到最佳位置,就返回Nil。
为当前Stage创建partition个数的task后,把tasks.toArray等信息封装成TaskSet,提交给TaskScheduler执行。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
TaskSchedulerImpl.submitTasks()
TaskScheduler提交TaskSet时,
- 创建一个TaskSetManager对象(org.apache.spark.scheduler下);
- 从taskSet中取出stageId,把【stageAttemptId,每个stage对应的TaskSetManager】更新到taskSetsByStageIdAndAttempt内存缓存中;
- 把TaskSetManager添加到initialize()时创建的FIFOSchedulableBuilder或者FairSchedulableBuilder中;
- 最重要的,调用backend.reviveOffers()
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
//
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
TaskSetManager
TaskSetManager调度TaskSchedulerImpl中的一个单独的TaskSet。
这个类一直跟踪每个task,如果失败了就重试这些task,并且通过延迟调度,为这个TaskSet进行locality-aware的调度()。
TaskSetManager主要的接口是resourceOffer(),它询问这个TaskSet是否要在一个结点上运行一个task,并更新状态,告诉TaskSet某个task状态改变了。
locality-aware
Task的本地化级别,依次是PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY。
扫描二维码关注公众号,回复: 11315337 查看本文章
- PROCESS_LOCAL:进程本地化,rdd的partition和task,在同一个Executor进程内,速度最快
- NODE_LOCAL:rdd的partition和task不在一个进程,但是在同一个worker结点上;
- RACK_LOCAL:rdd的partition和task在同一个机架上;
reviveOffers()
这里的backend是SparkContext.createTaskScheduler时,根据sparkUrl初始化的。Standalone模式时:
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
但是StandaloneSchedulerBackend中并没有reviveOffers(),而是在它继承的CoarseGrainedSchedulerBackend中,
向内部类driverEndpoint发送了ReviveOffers消息,又是自己收到,
case ReviveOffers =>
makeOffers()
最后执行的是CoarseGrainedSchedulerBackend的makeOffers():
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
makeOffers()
- 从activeExecutors里,把活动的Executor封装成一个个的WorkerOffer对象,
- 然后把WorkerOffer对象的序列传给TaskSchedulerImpl的resourceOffers(),传入创建的WorkerOffer对象。
- 最后在Offer出的Executors上启动各个task。
makeOffers – resourceOffers()
TaskSchedulerImpl的resourceOffers(),传入的参数WorkerOffers,封装了当前App的所有可用Executors的资源信息。
resourceOffers被调用来在集群的slaves上分配资源。为每个结点用round-robin方式分配tasks,把task分配到Executor上。
- resourceOffers首先保存各个WorkerOffer信息,过滤掉超时的offers,然后对可用offer进行shuffle。
- 接着对offers
建立一个task列表用来分配到各个Worker上,这个列表保存了每个task要用的所有cpu数量;
从offers取出可用cpu数量的列表;
后面调用 resourceOfferSingleTaskSet() 来为每个task分配CPUS_PER_TASK个cpu,
从而把task和executors绑定到了一起。
resourceOffers() 是task资源分配机制的核心。
最终每个taskSet所需的Executor资源的分配,是在resourceOfferSingleTaskSet()中完成的。
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
// The executor address is expected to be non empty.
addressesWithDescs += (shuffledOffers(i).address.get -> task)
}
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
makeOffers – launchTasks()
launchTasks拿到序列化后的task,发送一个LaunchTask消息给executorEndpoint,
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
这个executorEndpoint其实是CoarseGrainedExecutorBackend。
参考它的伴生对象中:
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
ExecutorBackend收到消息后调用Executor的launchTask():
// 把CoarseGrainedExecutorBackend作为第一个参数传给Executor。
executor.launchTask(this, taskDesc)
最后,把一个task通过executor的线程池启动起来:
// Executor.scala
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 内部类TaskRunner把task反序列化,封装成TaskRunner的Runner,放入线程池执行
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}