版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83782823
AMContainerAllocatedTransition(基于hadoop 2.7.6)
前述RmAppAttempt的状态转换:
AttemptStartedTransition:NEW–>SUBMITTED
ScheduleTransition:SUBMITTED–>SCHEDULED
主要逻辑步骤:
1.调用scheduler.allocate方法,触发RMContainerEventType.ACQUIRED事件(该事件的处理器AcquiredTransition会将Container加入到containerAllocationExpirer)
2.触发RMStateStoreEventType.STORE_APP_ATTEMPT事件
AMContainerAllocatedTransition转换器处理后,RmAppAttempt的状态由SCHEDULED转换为ALLOCATED_SAVING.
/**
* 主要逻辑:
* 1.调用scheduler.allocate方法
* 2.触发RMStateStoreEventType.STORE_APP_ATTEMPT事件
*/
private static final class AMContainerAllocatedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler.
/**
*
* 1.释放已经完成的Container.
* 2.将新需求更新到(FiCaSchedulerApp)application里的(Map<Priority, Map<String, ResourceRequest>>)requests列表
* 3.getAllocation方法
* 3.1为已分配的Container创建NMToken
* 3.2会触发RMContainerEventType.ACQUIRED事件,该事件由AcquiredTransition处理,会将container加入到containerAllocationExpirer
*
*/
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null);
/**
* 存在创建NMToken失败的情况,则重新触发RMAppAttemptEventType.CONTAINER_ALLOCATED用于再次分配资源
*/
if (amContainerAllocation.getContainers().size() == 0) {
/**
* 重新触发事件:RMAppAttemptEventType.CONTAINER_ALLOCATED
*/
appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED;
}
/**
* AM只需要一个Container
*/
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
.getRMContainer(appAttempt.getMasterContainer().getId());
/**
* 设置为AmContainer
*/
rmMasterContainer.setAMContainer(true);
appAttempt.rmContext.getNMTokenSecretManager()
.clearNodeSetForAttempt(appAttempt.applicationAttemptId);
appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource());
/**
* 触发事件:RMStateStoreAppAttemptEvent(RMStateStoreEventType.STORE_APP_ATTEMPT)
*/
appAttempt.storeAttempt();
return RMAppAttemptState.ALLOCATED_SAVING;
}
}