版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83792122
AMLauncher.launch(基于hadoop 2.7.6)
主要有launch、cleanup方法,launch和cleanup方法分别实现了处理LAUNCH和CLEAN事件的逻辑.
1.对于LAUNCH事件,launch方法会调用rpc方法startContainers启动Container
2.触发RMAppAttemptEventType.LAUNCHED事件.
public class AMLauncher implements Runnable {
...省略...
@SuppressWarnings("unchecked")
public void run() {
switch (eventType) {
case LAUNCH:
try {
LOG.info("Launching master" + application.getAppAttemptId());
/**
* 主要是通过rpc方法startContainers启动Container
*/
launch();
/**
* 触发RMAppAttemptEventType.LAUNCHED事件
*/
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message);
handler.handle(new RMAppAttemptEvent(application
.getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
}
break;
case CLEANUP:
try {
LOG.info("Cleaning master " + application.getAppAttemptId());
cleanup();
} catch(IOException ie) {
LOG.info("Error cleaning master ", ie);
} catch (YarnException e) {
StringBuilder sb = new StringBuilder("Container ");
sb.append(masterContainer.getId().toString());
sb.append(" is not handled by this NodeManager");
if (!e.getMessage().contains(sb.toString())) {
LOG.info("Error cleaning master ", e);
}
}
break;
default:
LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
break;
}
}
...省略...
}
/**
与对应的NodeManager通信,要求它启动ApplicationMaster.整个过程比较简单,首先创建一个ContainerManagementProtocol协议的客户端,然后向对应的NodeManager发起连接请求,接着将启动AM所需的各种信息,包括启动命令、JAR包、环境变量等信息,封装成一个StartContainerRequest对象,然后通过RPC函数startContainer发送给对应的NodeManager
* @throws IOException
* @throws YarnException
*/
private void launch() throws IOException, YarnException {
/**
* 连接NodeManager
*/
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
/**
* StartContainerRequest包含以下两个字段:
* 1.container_launch_context:封装了执行环境,主要包括以下字段:
* localResources:Container执行所需的本地资源,比如字典文件、jar包或可执行文件等,以key/value格式保存.
* tokens:Container执行所需的格式Token
* service_data:附属服务所需的数据,以key/value格式保存
* environment:Container执行所需的环境变量,以key/value格式保存
* command:Container执行命令,需要是一条shell命令
* application_acls:应用程序访问控制列表,以key/value格式保存
* 2.container_token:Container启动时的安全令牌
*/
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
/**
* 调用了ContainerManagerImpl.startContainers方法
*/
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}