Content List
NodeStatusUpdaterImpl.startStatusUpdater.new Runnable.run
NodeManager receive apps to clean up at heart beat with resource manager.
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
CMgrCompletedAppsEvent’s event type is ContainerManagerEventType.FINISH_APPS
public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup, Reason reason) {
super(ContainerManagerEventType.FINISH_APPS);
this.appsToCleanup = appsToCleanup;
this.reason = reason;
}
NodeManager.serviceInit
ContainerManagerEventType is handled by containerManager.
dispatcher.register(ContainerManagerEventType.class, containerManager);
ContainerManagerImpl.handle
for each application id, send a ApplicationFinishEvent event.
public void handle(ContainerManagerEvent event) {
switch (event.getType()) {
case FINISH_APPS:
CMgrCompletedAppsEvent appsFinishedEvent =
(CMgrCompletedAppsEvent) event;
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
String diagnostic = "";
if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
diagnostic = "Application killed on shutdown";
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
diagnostic));
}
break;
ApplicationFinishEvent is a type of ApplicationEvent
public class ApplicationFinishEvent extends ApplicationEvent {
private final String diagnostic;
/**
* Application event to abort all containers associated with the app
* @param appId to abort containers
* @param diagnostic reason for the abort
*/
public ApplicationFinishEvent(ApplicationId appId, String diagnostic) {
super(appId, ApplicationEventType.FINISH_APPLICATION);
this.diagnostic = diagnostic;
}
ContainerManagerImpl.serviceInit
dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher());
ApplicationEventDispatcher
ApplicationEventDispatcher forward every event to corresponding application. The implementation class of Application is ApplicationImpl.
class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
@Override
public void handle(ApplicationEvent event) {
Application app =
ContainerManagerImpl.this.context.getApplications().get(
event.getApplicationID());
if (app != null) {
app.handle(event);
} else {
LOG.warn("Event " + event + " sent to absent application "
+ event.getApplicationID());
}
}
}
ApplicationImpl.hadnle
ApplicationImpl uses state machine.
public void handle(ApplicationEvent event) {
this.writeLock.lock();
try {
ApplicationId applicationID = event.getApplicationID();
LOG.debug("Processing " + applicationID + " of type " + event.getType());
ApplicationState oldState = stateMachine.getCurrentState();
ApplicationState newState = null;
try {
// queue event requesting init of the same app
newState = stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.warn("Can't handle this event at current state", e);
}
if (oldState != newState) {
LOG.info("Application " + applicationID + " transitioned from "
+ oldState + " to " + newState);
}
} finally {
this.writeLock.unlock();
}
}
ApplicationImpl.stateMachineFactory
When event ApplicationEventType.FINISH_APPLICATION is received, it calls AppFinishTriggeredTransition.transition
addTransition(
ApplicationState.RUNNING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
ApplicationImpl.AppFinishTriggeredTransition
When app has no running containers, it call app.handleAppFinishWithContainersCleanedup()
and transite to ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
state.
static class AppFinishTriggeredTransition
implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event;
if (app.containers.isEmpty()) {
// No container to cleanup. Cleanup app level resources.
app.handleAppFinishWithContainersCleanedup();
return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
}
// Send event to ContainersLauncher to finish all the containers of this
// application.
for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
"Container killed on application-finish event: " + appEvent.getDiagnostic()));
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}
}
ApplicationImpl.handleAppFinishWithContainersCleanedup
It sends two event.
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
// tell any auxiliary services that the app is done
this.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
// TODO: Trigger the LogsManager
}
ContainerManagerImpl
ContainerManagerImpl register the handler of LocalizationEventType to instance of ResourceLocalizationService.
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
ResourceLocalizationService.handle
public void handle(LocalizationEvent event) {
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
// ...
case DESTROY_APPLICATION_RESOURCES:
handleDestroyApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
default:
throw new YarnRuntimeException("Unknown localization event: " + event);
}
}
ResourceLocalizationService.handleDestroyApplicationResources
It calls submitDirForDeletion to delete {username}/appcache/ {localdir}/nmPrivate/ {localdir}.
private void handleDestroyApplicationResources(Application application) {
String userName = application.getUser();
ApplicationId appId = application.getAppId();
String appIDStr = application.toString();
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
submitDirForDeletion(userName, appDir);
// Delete the nmPrivate app-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
submitDirForDeletion(null, appSysDir);
}
// TODO: decrement reference counts of all resources associated with this
// app
dispatcher.getEventHandler().handle(new ApplicationEvent(
application.getAppId(),
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
}
getLocalDirsForCleanup:
public List<String> getLocalDirsForCleanup() {
return DirectoryCollection.concat(localDirs.getGoodDirs(),
localDirs.getFullDirs());
}
ResourceLocalizationService.submitDirForDeletion
private void submitDirForDeletion(String userName, Path dir) {
try {
lfs.getFileStatus(dir);
delService.delete(userName, dir, new Path[] {});
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
} catch (IOException ie) {
// ignore
return;
}
}
DeletionService.delete to delete the file physically.
public void delete(String user, Path subDir, Path... baseDirs) {
// TODO if parent owned by NM, rename within parent inline
if (debugDelay != -1) {
List<Path> baseDirList = null;
if (baseDirs != null && baseDirs.length != 0) {
baseDirList = Arrays.asList(baseDirs);
}
FileDeletionTask task =
new FileDeletionTask(this, user, subDir, baseDirList);
recordDeletionTaskInStateStore(task);
sched.schedule(task, debugDelay, TimeUnit.SECONDS);
}
}