The process NodeManager clean Application log and intermediate data like map output

Content List

NodeStatusUpdaterImpl.startStatusUpdater.new Runnable.run

NodeManager receive apps to clean up at heart beat with resource manager.

List<ApplicationId> appsToCleanup =
                response.getApplicationsToCleanup();
            //Only start tracking for keepAlive on FINISH_APP
            trackAppsForKeepAlive(appsToCleanup);
            if (!appsToCleanup.isEmpty()) {
              dispatcher.getEventHandler().handle(
                  new CMgrCompletedAppsEvent(appsToCleanup,
                      CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
            }

CMgrCompletedAppsEvent’s event type is ContainerManagerEventType.FINISH_APPS

public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup, Reason reason) {
    super(ContainerManagerEventType.FINISH_APPS);
    this.appsToCleanup = appsToCleanup;
    this.reason = reason;
  }

NodeManager.serviceInit

ContainerManagerEventType is handled by containerManager.

 dispatcher.register(ContainerManagerEventType.class, containerManager);

ContainerManagerImpl.handle

for each application id, send a ApplicationFinishEvent event.

public void handle(ContainerManagerEvent event) {
    switch (event.getType()) {
    case FINISH_APPS:
      CMgrCompletedAppsEvent appsFinishedEvent =
          (CMgrCompletedAppsEvent) event;
      for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
        String diagnostic = "";
        if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
          diagnostic = "Application killed on shutdown";
        } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
          diagnostic = "Application killed by ResourceManager";
        }
        this.dispatcher.getEventHandler().handle(
            new ApplicationFinishEvent(appID,
                diagnostic));
      }
      break;

ApplicationFinishEvent is a type of ApplicationEvent

public class ApplicationFinishEvent extends ApplicationEvent {
  private final String diagnostic;

  /**
   * Application event to abort all containers associated with the app
   * @param appId to abort containers
   * @param diagnostic reason for the abort
   */
  public ApplicationFinishEvent(ApplicationId appId, String diagnostic) {
    super(appId, ApplicationEventType.FINISH_APPLICATION);
    this.diagnostic = diagnostic;
  }

ContainerManagerImpl.serviceInit

dispatcher.register(ApplicationEventType.class,
        new ApplicationEventDispatcher());

ApplicationEventDispatcher

ApplicationEventDispatcher forward every event to corresponding application. The implementation class of Application is ApplicationImpl.

class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {

    @Override
    public void handle(ApplicationEvent event) {
      Application app =
          ContainerManagerImpl.this.context.getApplications().get(
              event.getApplicationID());
      if (app != null) {
        app.handle(event);
      } else {
        LOG.warn("Event " + event + " sent to absent application "
            + event.getApplicationID());
      }
    }
  }

ApplicationImpl.hadnle

ApplicationImpl uses state machine.

public void handle(ApplicationEvent event) {

    this.writeLock.lock();

    try {
      ApplicationId applicationID = event.getApplicationID();
      LOG.debug("Processing " + applicationID + " of type " + event.getType());

      ApplicationState oldState = stateMachine.getCurrentState();
      ApplicationState newState = null;
      try {
        // queue event requesting init of the same app
        newState = stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitonException e) {
        LOG.warn("Can't handle this event at current state", e);
      }
      if (oldState != newState) {
        LOG.info("Application " + applicationID + " transitioned from "
            + oldState + " to " + newState);
      }
    } finally {
      this.writeLock.unlock();
    }
  }

ApplicationImpl.stateMachineFactory

When event ApplicationEventType.FINISH_APPLICATION is received, it calls AppFinishTriggeredTransition.transition

addTransition(
               ApplicationState.RUNNING,
               EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
                   ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
               ApplicationEventType.FINISH_APPLICATION,
               new AppFinishTriggeredTransition())

ApplicationImpl.AppFinishTriggeredTransition

When app has no running containers, it call app.handleAppFinishWithContainersCleanedup() and transite to ApplicationState.APPLICATION_RESOURCES_CLEANINGUP; state.

static class AppFinishTriggeredTransition
      implements
      MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
    @Override
    public ApplicationState transition(ApplicationImpl app,
        ApplicationEvent event) {
      ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event;
      if (app.containers.isEmpty()) {
        // No container to cleanup. Cleanup app level resources.
        app.handleAppFinishWithContainersCleanedup();
        return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
      }

      // Send event to ContainersLauncher to finish all the containers of this
      // application.
      for (ContainerId containerID : app.containers.keySet()) {
        app.dispatcher.getEventHandler().handle(
            new ContainerKillEvent(containerID,
                ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
                "Container killed on application-finish event: " + appEvent.getDiagnostic()));
      }
      return ApplicationState.FINISHING_CONTAINERS_WAIT;
    }
  }

ApplicationImpl.handleAppFinishWithContainersCleanedup

It sends two event.

  void handleAppFinishWithContainersCleanedup() {
    // Delete Application level resources
    this.dispatcher.getEventHandler().handle(
        new ApplicationLocalizationEvent(
            LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));

    // tell any auxiliary services that the app is done 
    this.dispatcher.getEventHandler().handle(
        new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));

    // TODO: Trigger the LogsManager
  }

ContainerManagerImpl

ContainerManagerImpl register the handler of LocalizationEventType to instance of ResourceLocalizationService.

dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);

ResourceLocalizationService.handle

public void handle(LocalizationEvent event) {
    // TODO: create log dir as $logdir/$user/$appId
    switch (event.getType()) {
    // ...
    case DESTROY_APPLICATION_RESOURCES:
      handleDestroyApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    default:
      throw new YarnRuntimeException("Unknown localization event: " + event);
    }
  }

ResourceLocalizationService.handleDestroyApplicationResources

It calls submitDirForDeletion to delete l o c a l d i r / u s e r c a c h e / {username}/appcache/ a p p I d S t r a n d {localdir}/nmPrivate/ a p p I d S t r f o r e v e r y {localdir}.

private void handleDestroyApplicationResources(Application application) {
    String userName = application.getUser();
    ApplicationId appId = application.getAppId();
    String appIDStr = application.toString();


    // Delete the application directories
    userName = application.getUser();
    appIDStr = application.toString();

    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {

      // Delete the user-owned app-dir
      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, userName);
      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(allAppsdir, appIDStr);
      submitDirForDeletion(userName, appDir);

      // Delete the nmPrivate app-dir
      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
      Path appSysDir = new Path(sysDir, appIDStr);
      submitDirForDeletion(null, appSysDir);
    }

    // TODO: decrement reference counts of all resources associated with this
    // app

    dispatcher.getEventHandler().handle(new ApplicationEvent(
          application.getAppId(),
          ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
  }

getLocalDirsForCleanup:

 public List<String> getLocalDirsForCleanup() {
    return DirectoryCollection.concat(localDirs.getGoodDirs(),
        localDirs.getFullDirs());
  }

ResourceLocalizationService.submitDirForDeletion

  private void submitDirForDeletion(String userName, Path dir) {
    try {
      lfs.getFileStatus(dir);
      delService.delete(userName, dir, new Path[] {});
    } catch (UnsupportedFileSystemException ue) {
      LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
    } catch (IOException ie) {
      // ignore
      return;
    }
  }

DeletionService.delete to delete the file physically.

 public void delete(String user, Path subDir, Path... baseDirs) {
    // TODO if parent owned by NM, rename within parent inline
    if (debugDelay != -1) {
      List<Path> baseDirList = null;
      if (baseDirs != null && baseDirs.length != 0) {
        baseDirList = Arrays.asList(baseDirs);
      }
      FileDeletionTask task =
          new FileDeletionTask(this, user, subDir, baseDirList);
      recordDeletionTaskInStateStore(task);
      sched.schedule(task, debugDelay, TimeUnit.SECONDS);
    }
  }

猜你喜欢

转载自blog.csdn.net/houzhizhen/article/details/79544012