版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83621436
AttemptStartedTransition(基于hadoop 2.7.6)
主要逻辑:
1.向ApplicationMasterService注册AppAttempt(在ApplicationMasterService服务中以attemptId为键占个坑,后面ApplicationMaster通过allocate发送心跳时,会检查其对应的attempt是否注册过.)
2.触发SchedulerEventType.APP_ATTEMPT_ADDED事件,该事件由CapacityScheduler.handler方法处理.
代码清单
private static final class RMAppAttemptImpl.AttemptStartedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
/**
* 是否从之前的状态恢复,默认写死了是false
*/
boolean transferStateFromPreviousAttempt = false;
if (event instanceof RMAppStartAttemptEvent) {
transferStateFromPreviousAttempt =
((RMAppStartAttemptEvent) event)
.getTransferStateFromPreviousAttempt();
}
appAttempt.startTime = System.currentTimeMillis();
/**
*1.实例化应答类AllocateResponse,将ResponseId初始化-1.
*2.在ApplicationMasterService服务中根据attemptId占个坑,后面ApplicationMaster通过allocate发送心跳时,会检查其对应的attempt是否注册过.
*/
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.clientTokenMasterKey =
appAttempt.rmContext.getClientToAMTokenSecretManager().createMasterKey(appAttempt.applicationAttemptId);
}
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
}
}
这里的masterService服务是随着ResourceManager初始化的,就是ApplicationMasterService类,见ResourceManager的初始化代码:
ResourceManager.RMActiveService.serviceInit(){
...省略...
masterService = createApplicationMasterService();
addService(masterService) ;
rmContext.setApplicationMasterService(masterService);
...省略...
}
SchedulerEventType.APP_ATTEMPT_ADDED的处理器是哪个,可以参考AddApplicationToSchedulerTransition(详见4)的思路去定位代码.