背景
以前一直以为task本地行,将task分配到哪一个executor上是taskschedular做的事情,知道看了DagSchedular和TaskSchedular之后才知道,DagSchedular是负责上层调度,task具体运行在那个executor上,而TaskSchedular是决定task在ececutor上是process级别的本地行还是memory级别的本地行,因为在DagSchedular会有在sparkcontext初始化时候创建的blockManager对象,所以DagSchedular是可以获取rdd.partition缓存信息的,并且决定在那个executor上运行。
DagSchedular负责为task分配cpu等资源信息,并发送任务到executor上,coarseDrainExecutorBackend中的receive方法就会调用launchTask 运行task,该executor是在sparkcontext初始化的时候从main函数启动的,SchrdularBackEnd拥有所有executor上的cores信息,并会把该信息传递给DagSchedular。
过程
1.起点:taskScheduler.submitTasks()
DagSchedular进行上层stage划分之后,将stage转换成tasksets,并且向TaskSchedular进行提交,注意,在提交tasksets的时候,DagSchedular已经完成了上层task任务运行的本地性调度,比如某一个task应该在哪个executor上运行,而TaskSchedular是进行底层的本地性,比如:线程级别、内存级别和其他级别。因为DagSchedular内部含有一个blockManager对象,可以获得block信息(rdd)的缓存情况,所以executor级别的本地性是由DagSchedular完成的
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
submit方法如下所示,核心代码由三部分,一个tasksetManager创建,然后是将tasksetManager加入schdularBuilder队列,schdularBuilder队列是在TaskSchdular创建初始化的时候实例化的,存在两种实现:FIFO和FAIR。最后一段核心代码就是使用backend发送触发信息,该触发信号主要是通知SchedularBackend 获取现有的资源,调用TaskSchedular使用资源分配task
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //核心代码1 创建TasksetManager , 创建方式 new create val manager = createTaskSetManager(taskSet, maxTaskFailures) //省略代码1,conflect manager检测 //核心代码2 将taskSetManager添加进队列 schedulableBuilder初始化注意一下 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //此处省略一部分代码2 } //核心代码3 发送触发器 backend.reviveOffers() }
2.SchedularBackend.reviveOffers() 转发消息
这里的资源信息获取,只是获取已有资源信息的详情,而不是向master申请资源,简单的原理就是遍历现有executor,收集所有可用的cores信息,因为在DagSchedular阶段就已经将task分配到了具体的executor,所以我们只需要遍历一下tasksetManager中的executorId列表,然后去获取一下就行了,具体获取操作是backend对象去执行的。
reviveOffers代码如下,其中driverEndPoint是SchedularBackEnd一个内部类的实例,负责与executor(包含本机)进行交互,值得注意的是本次函数调用,是Driver节点,也就是本节点,自己调用的driverEndpoint.send方法给自己,在方法内部有很多case条件,可以区分是本机发送的还是executor发送的信息。
override def reviveOffers() { driverEndpoint.send(ReviveOffers) }
DriverEndPoint receive接收代码如下:
override def receive: PartialFunction[Any, Unit] = { //省略其他case case ReviveOffers => //本次case makeOffers() //省略其他case }
3.SchedularBackEnd.DriverEndPoint.makeOffers().资源信息获取
该函数主要作用在于收集现有executor的资源信息,并且将资源信息反向传递给TaskSchedular对象,由TaskSchedular对象进行具体的任务资源匹配。
private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // 获取可用活跃节点和该节点的资源信息:闲置cpu数目 val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq //调用resourceOffers 进行任务资源分配 scheduler.resourceOffers(workOffers) } if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }
4.TaskSchedular.resourceOffers() 回调,使用资源信息分配给task
此段函数主要作用:
a.打乱核心资源信息,出发点为了负载均衡
b.从shecularBuilder队列中获取datsetsManager,以某种策略(FIFO/FAIR)
c.循环分配资源,resourceOffserSingleTask函数,会遍历一遍executor,分配任务
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { //省略一段代码,用于检测是否有新的executor或者需要更新信息 //省略代码,黑名单node节点检测 //负载金衡操作,打乱executor资源顺序 val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray //从队列里获取tasksetManager,该队列是schedulableBUulder创建的 val sortedTaskSets = rootPool.getSortedTaskSetQueue //省略代码,将上面检测出的新的executor节点,添加到tasksetManager //核心代码分配任务资源 for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
其中遍历每一个taskSet,并按照LocalityLevels从高到低尝试本地性优先级,resourceOffersSingleTaskSet按照当前遍历到的taskset和本地性条件,分配资源,这段代码里的do-while的作用会在后面提到。
for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { //为什么需要do-while呢??? 后面会提到 do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) }
5.TaskSchedular.resourceOfferSingleTaskSet()
这段函数中遍历每一个exrcutor,然后遍历符合当前本地性条件的taskset,但是因为resourceOffers只返回了一个taskset,所以这个函数调用的作用就是,为每一个executor 分配之多一个taskset,这是为了负载均衡,所以调用调用一个该函数,遍历所有的executor,但是不会把所有可分配的task都分配出去,上文中do-while的作用就在于此,每次之多给executor分配一个task,循环多次,保证负载均衡。
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false //遍历每一个executor for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { try { //贼神奇的代码,这个for循环之循环一次,因为resourceOffer值返回了一个taskset for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
总结
额外资料:
其他人写的这方面的文章,下面这篇幅挺好的
https://www.jianshu.com/p/ba11e9aef0a6