AM启动--AMLauncher(六)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83792122

AMLauncher.launch(基于hadoop 2.7.6)

主要有launch、cleanup方法,launch和cleanup方法分别实现了处理LAUNCH和CLEAN事件的逻辑.

1.对于LAUNCH事件,launch方法会调用rpc方法startContainers启动Container
2.触发RMAppAttemptEventType.LAUNCHED事件.

public class AMLauncher implements Runnable {
...省略...
@SuppressWarnings("unchecked")
  public void run() {
    switch (eventType) {
    case LAUNCH:
      try {
        LOG.info("Launching master" + application.getAppAttemptId());
        /**
        *    主要是通过rpc方法startContainers启动Container
        */
        launch();
        /**
        *    触发RMAppAttemptEventType.LAUNCHED事件
        */
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));
      } catch(Exception ie) {
        String message = "Error launching " + application.getAppAttemptId()
            + ". Got exception: " + StringUtils.stringifyException(ie);
        LOG.info(message);
        handler.handle(new RMAppAttemptEvent(application
            .getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
      }
      break;
    case CLEANUP:
      try {
        LOG.info("Cleaning master " + application.getAppAttemptId());
        cleanup();
      } catch(IOException ie) {
        LOG.info("Error cleaning master ", ie);
      } catch (YarnException e) {
        StringBuilder sb = new StringBuilder("Container ");
        sb.append(masterContainer.getId().toString());
        sb.append(" is not handled by this NodeManager");
        if (!e.getMessage().contains(sb.toString())) {
          LOG.info("Error cleaning master ", e);          
        }
      }
      break;
    default:
      LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
      break;
    }
  }
  
...省略...
}
  /**
   与对应的NodeManager通信,要求它启动ApplicationMaster.整个过程比较简单,首先创建一个ContainerManagementProtocol协议的客户端,然后向对应的NodeManager发起连接请求,接着将启动AM所需的各种信息,包括启动命令、JAR包、环境变量等信息,封装成一个StartContainerRequest对象,然后通过RPC函数startContainer发送给对应的NodeManager
   * @throws IOException
   * @throws YarnException
   */
  private void launch() throws IOException, YarnException {
	  /**
	   * 连接NodeManager
	   */
    connect();
    ContainerId masterContainerID = masterContainer.getId();
    ApplicationSubmissionContext applicationContext =
      application.getSubmissionContext();
    LOG.info("Setting up container " + masterContainer
        + " for AM " + application.getAppAttemptId());  
    ContainerLaunchContext launchContext =
        createAMContainerLaunchContext(applicationContext, masterContainerID);
    /**
     * StartContainerRequest包含以下两个字段:
     * 	1.container_launch_context:封装了执行环境,主要包括以下字段:
     * 		localResources:Container执行所需的本地资源,比如字典文件、jar包或可执行文件等,以key/value格式保存.
     * 		tokens:Container执行所需的格式Token
     * 		service_data:附属服务所需的数据,以key/value格式保存
     * 		environment:Container执行所需的环境变量,以key/value格式保存
     * 		command:Container执行命令,需要是一条shell命令
     * 		application_acls:应用程序访问控制列表,以key/value格式保存
     * 	2.container_token:Container启动时的安全令牌
     */
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(launchContext,
          masterContainer.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
/**
* 调用了ContainerManagerImpl.startContainers方法
*/
    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(masterContainerID)) {
      Throwable t =response.getFailedRequests().get(masterContainerID).deSerialize();
      parseAndThrowException(t);
    } else {
      LOG.info("Done launching container " + masterContainer + " for AM "
          + application.getAppAttemptId());
    }
  }

猜你喜欢

转载自blog.csdn.net/zhusirong/article/details/83792122
am