一 Driver状态改变
1. 需要移除状态:DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED
2. 移除
2.1 找到了对应id的driver
2.2 移除缓存中的driver
2.3 放入完成缓存
2.4 从持久化引擎中移除
2.5 设置为改变到的最终的状态(DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED)
2.6 移除driver相关的worker
2.7 schedule() // 调度
case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
// 找到了对应id的driver
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 移除缓存中的driver
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 放入完成缓存
completedDrivers += driver
// 从持久化引擎中移除
persistenceEngine.removeDriver(driver)
driver.state = finalState // 设置为改变到的最终的状态(DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED)
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver)) // 移除driver相关的worker
schedule() // 调度
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
二 Executor状态改变
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) // 根据执行id获取执行信息
execOption match {
case Some(exec) =>
val appInfo = idToApp(appId)
val oldState = exec.state // 获取oldState,用于后续判断是否为 LAUNCHING <--> RUNNING
exec.state = state
// 只有LAUNCHING转为RUNNING才是合法的
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
if (ExecutorState.isFinished(state)) { // KILLED, FAILED, LOST, EXITED
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) { // appInfo.isFinished -> state != ApplicationState.WAITING && state != ApplicationState.RUNNING
// so, !appInfo.isFinished -> ApplicationState.WAITING 或 ApplicationState.RUNNING,如果是WAITING或RUNNING,则不从缓存中移除
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec) // 从worker中移除相应的executor
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED) // 非正常退出,并且达到最大重试次数,在移除Application
}
}
}
schedule() // 重新调度
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
applicationMetricsSystem.removeSource(a.appSource)
}
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
app.driver.send(ApplicationRemoved(state.toString))
}
persistenceEngine.removeApplication(app)
schedule()
// Tell all workers that the application has finished, so they can clean up any app state.
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}