schedule,作用是:在Application中间调度当前可用的资源。每当新的Application加入或可用的资源改变时,调度方法就会被调用。为什么资源改变时要调度?因为之前可能某些Application由于缺乏资源处于waitting状态,当资源改变时,就可以重新尝试调用这些Application。
调度前,会打乱Worker(状态为Alive)的顺序,然后用轮询的方式依次在满足条件的worker上调度driver。
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
// 非alive节点直接返回
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) // 打乱顺序
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
/***************************调度driver开始**************************************/
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver) // 为Driver指定Worker,并在worker上启动driver
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
/***************************调度driver结束**************************************/
startExecutorsOnWorkers() // 创建executors
}
为Driver指定Worker,并在worker上启动driver,并设置Driver的状态为RUNNING
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
为等待调度的Application在Worker上调度executors,需要满足如下条件:
- app剩余需要的cores >= 每个executor应分配的cores(为什么?还需要的cores已经不足以启动一个新的executor,所以不再创建)
- 选出可用的worker:状态为alive;可用内存不少于app需要的内存;可用的cores不少于每个execuror应分配的cores。
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
// app.coresLeft还需要的cores,coresPerExecutor每个excutor上应分配的cores
if (app.coresLeft >= coresPerExecutor) { // 不会再去调度新的executors
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
// 在workers上调度executors
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
// 根据分配的execurots数量来启动executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
(1) 有两种启动executors的模式,第一种是 在尽可能多的worker上去启动一个application的executors;第二种则相反,在尽可能少的worker上去启动一个application的executors。第一种是默认方案,因为第一种模式有利于数据本地化。
(2) 每一个executor的core的数量是可以配置的。如果这个被精确配置了,那么同一个application的多个executors将可能在同一个work上来启动,只要这个worker有足够的cores和内存。另外,如果在一个单独的调度中期中,每个worker上只为一个application启动一个executor,那么,每一个executor默认获取worker上所有可用的cores。
(3) 注意,当`spark.executor.cores`没有设置时,我们仍然可能会为同一个worker上的同一个application启动多个executors。例如,appA和appB在worker1上都有一个executor,并且appA.coresleft > 0,然后appB执行完毕并释放了它在worker1上的所有cores,那么在下个调度周期时,appA就启动一个新的executor然后占有worker1上的所有空闲cores,因此,appA就在worker1上有多个executors在运行了。
(4) 在每个worker上分配coresPerExecutor个cores是十分必要的(而不是每次只分配一个core)。考虑下面的例子:集群有4个workers,每个worker有16个cores。用户请求3个executors(spark.cores.max = 48, spark.executor.cores = 16)。如果每次分配一个core,那么每个worker上的12个cores会被分配每个executor(如前所述,总共需要48个core,每次启动一个,那么48个core将会平均分配到所有的worker上,每个worker上得到的core的数量为12)。由于12 < 16,那么将没有executor会被启动。
/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
*
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor per application may be launched on each
* worker during one single schedule iteration.
* Note that when `spark.executor.cores` is not set, we may still launch multiple executors from
* the same application on the same worker. Consider appA and appB both have one executor running
* on worker1, and appA.coresLeft > 0, then appB is finished and release all its cores on worker1,
* thus for the next schedule iteration, appA launches a new executor that grabs all the free
* cores on worker1, therefore we get multiple executors from appA running on worker1.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor // 每次分配minCoresPerExecutor,如(4)所述
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores // 数组,记录的是每个worker上分配的core的数量
}
创建executors描述信息并启动executors,并将app.state设置为ApplicationState.RUNNING
/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign) // 创建executors描述信息
launchExecutor(worker, exec) // 启动executor
app.state = ApplicationState.RUNNING // 设置app状态
}
}
向worker发送消息,启动executor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
// 向worker发送消息,启动executor
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}