rmAppManager.submitApplication详解
先简述两个点:
1.ResourceManager是有main函数的,是用hadoop启动脚本启动的.
2.因为rmAppManager是属于RM端的,rmAppManager.submitApplication前,submitApplication最开始提交端在哪 (即客户端提交应用程序的方法流程) :
1.job.waitForCompletion(这个估计比较熟悉了,是平常自定义MapReduce时提交job)–>
2. job.submit–>
3. JobSubmitter.submitJobInternal(通过writeSplits方法完成切片)–> 4.(ClientProtocol)submitClient.submitJob(协议的实现是YarnRunner)–>
5. ResourceMgrDelegate.submitApplication–>
6. (YarnClient)client.submitApplication(协议的实现是YarnClientImpl)–>
— --- 7.开始属于RM端 — ---
7. (ApplicationClientProtocol)rmClient.submitApplication(协议的实现是ClientRMService)–>
8. rmAppManager.submitApplication
submitApplication功能简要说明:
1.创建RMAppImpl,然后将RMAppImpl放入ResourceManager的上下文rmContext
2 触发RMAppEventType.START事件
代码清单
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
/**
* 创建RMAppImpl,然后将RMAppImpl放入ResourceManager上下文rmContext
*/
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();
/**
* 如果使用了Kerberos认证
*/
if (UserGroupInformation.isSecurityEnabled()) {
try {
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
} else {
/**
* 触发一个RMAppEventType.START事件
*/
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
}
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags(), amReq);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
/**
* 将创建的application放进ResourceManager的上下文rmContext
*/
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw new YarnException(message);
}
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
String appViewACLs = submissionContext.getAMContainerSpec()
.getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
rmContext.getSystemMetricsPublisher().appACLsUpdated(
application, appViewACLs, System.currentTimeMillis());
return application;
}
这里最主要的是触发了RMAppEventType.START事件,那么带着两个思路继续。
1.RMAppEventType.START事件由谁来处理?
2.RMAppEventType.START事件处理逻辑(怎么处理)?
基于上面两个问题,我们就要知道对应EventHandler及Dispatcher的由来,见ResourceManager(ResourceManager有main方法,是搭建环境后用脚本启动ResourceManager进程
)的serviceInit方法.更详尽的原理,大家可以搜索状态机和调度器相关资料.
ResourceManager.serviceInit代码清单
protected void serviceInit(Configuration conf) throws Exception {
...省略...
this.rmContext = new RMContextImpl();
...
createAndInitActiveServices()
...省略...
}
createAndInitActiveServices()方法代码清单
protected void createAndInitActiveServices() throws Exception {
activeServices = new RMActiveServices(this);
activeServices.init(conf);
}
activeServices.init是其基类AbstractService的方法,会回调具体实现类的serviceInit方法,所以我们看RMActiveServices的serviceInit方法.
RMActiveServices.serviceInit 代码清单
protected void serviceInit(Configuration configuration) throws Exception {
...省略....
/**
* 这里注册RMAppEventType事件类型的调度器ApplicationEventDispatcher,其逻辑是利用RMAppImpl.handler处理Event,这里的rmContext也是在初始化方法serviceInit中实例化出来.
*/
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext));
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
...省略....
/**
* rmAppManager也是这个时候创建的
*/
rmAppManager = createRMAppManager();
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
clientRM = createClientRMService();
addService(clientRM);
rmContext.setClientRMService(clientRM);
...省略....
}
见上代码
RMAppEventType事件类型对应的事件调度器是ApplicationEventDispatcher
@Private
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
private final RMContext rmContext;
public ApplicationEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
RMApp rmApp = this.rmContext.getRMApps().get(appID);
if (rmApp != null) {
try {
rmApp.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appID, t);
}
}
}
}
再看ApplicationEventDispatcher的handle方法的实现,可以知道最后RMAppEventType类型的事件由RMAppImpl来处理.
所以在RMAppImpl类里查找对应事件的处理.见RMAppImpl代码清单
private static final StateMachineFactory<RMAppImpl,RMAppState,
RMAppEventType,RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,RMAppState,
RMAppEventType,RMAppEvent>(RMAppState.NEW)
...省略...
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
...省略...
即RMAppEventType.START事件由转换器RMAppNewlySavingTransition来处理.