在前面的一篇文章中介绍了stage的划分算法以及task对应的partition的最佳位置计算算法。在DAGScheduler中将stage划分好之后,然后TaskScheduler会将taskSet中的task提交到executor中去执行,那么TaskScheduler是怎么样将task提交到executor中执行,又是如何如何分配executor的呢?本篇主要围绕TaskScheduler的task调度来剖析TaskScheduler的源码。
这里接着上篇文章,在DAGScheduler中的submitMissingTasks方法中调用TaskScheduler中的创建TaskSet并且调用submitTasks方法提交task:
//调用taskScheduler的submitTask创建TaskSet提交task
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
在TaskSchedulerImpl中调用方法submitTasks:
/**
* 提交task
* @param taskSet
*/
override def submitTasks(taskSet: TaskSet) {
//获取taskSet中所有的task
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//创建taskSetManager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
//SparkDeploySchedulerBackend创建AppClient,向Master注册application
//SparkDeploySchedulerBackend继承自CoarseGrainedSchedulerBackend类
//实际上是调用了CoarseGrainedSchedulerBackend类的reviveOffers方法
backend.reviveOffers()
}
在这里会创建一个TaskSetManager,它的主要作用是:
调度在TaskSchedulerImpl的单一的TaskSet中的tasks,这个类跟踪这每一个task,如果task任务提交失败就一直重复提交直到超过限定的次数,并且通过延迟调度的策略,为每一个TaskSet处理本地化调度机制。它的主要接口是resourceOffer,在这个接口中,TaskSet希望每一个节点运行一个task,并且接收任务的状态改变消息,来知道它负责的task状态的改变。
在这个方法中最后使用TaskScheduler底层创建的SparkDeploySchedulerBackend类,这个类创建AppClient,向Master注册application。这里调用SparkDeploySchedulerBackend类的reviveOffers方法。而SparkDeploySchedulerBackend继承自CoarseGrainedSchedulerBackend类,因此调用CoarseGrainedSchedulerBackend类的reviveOffers方法:
override def reviveOffers() {
driverActor ! ReviveOffers
}
在这个方法中发送ReviveOffers消息给driverActor,在driverActor中调用makeOffers()方法对消息进行处理:
case ReviveOffers =>
makeOffers()
// Make fake resource offers on all executors
def makeOffers() {
/**
* 第一步,调用TaskSchedulerImpl的resourceOffers方法,执行任务分配算法,将各个task随机分配到空闲的executor上去
* 第二步,分配好task到executor上之后,执行自己的launchTask方法,将task发送消息到对应的executor上去
* 由executor启动并执行task
*
* resourceOffers方法传入的是这个application的所有可用的executor,并且将其封装成workerOffer
* 每一个WorkerOffer代表了每一个executor可用的cpu资源数量。
*/
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
在这里会调用TaskSchedulerImpl的resourceOffers方法进行task的分配调度,因此这个方法就是分配算法的核心:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
//首先将可用的executor进行shuffle,也就是打散,尽量做到负载均衡
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
//然后针对WorkerOffer创建出一些有用的东西,比如tasks,可以理解为一个二维数组ArrayBuffer,每一个元素又是一个ArrayBuffer
//而且每一个ArrayBuffer的数量固定,也就是这个executor可用的cpu数量
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
//在TaskSchedulerImpl初始化的时候会创建一个调度池,并且将TaskSet放入调度池中进行调度
//在这里会有顺序的取出调度池中的TaskSet
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
/**
* 任务调度算法的核心
* 双重遍历for循环,遍历所有的taskSet,以及每一种本地化级别
* 本地化级别有多种:
* PROCESS_LOCAL,即rdd的partition和task在同一个executor中,速度最快
* NODE_LOCAL,rdd的partition和task不在一个executor中,即不在一个进程中,但是在一个worker节点中
* NO_PREF,没有所谓的本地化级别
* RACK_LOCAL,机架本地化,rdd的partition和task在一个机架上
* ANY,任意的本地化级别
* 上面的这些本地化级别,性能逐渐较低
*
* 遍历所有的taskSet,从最优的本地化级别开始遍历
*/
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
//对于当前taskSet
//尝试优先使用最小的本地化级别,将taskSet上的task在executor上启动
//如果启动不了,那么就跳出循环,进入下一种本地化级别,也就是放大本地化级别
//依次类推,直到将taskSet在某些本地化级别下,让task在executor上全部启动
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
在TaskSchedulerImpl中对task进行调度分配之后,到executor会执行每一个task,那么,本篇文章对于taskScheduler的task分配算法的剖析就到这里,后面会接着本篇文章对executor进行剖析。