版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83621048
StartAppAttemptTransition(基于hadoop 2.7.6)
StartAppAttemptTransition是对RMAppEventType.APP_ACCEPTED事件的转换处理,转换处理后RMApp状态由SUBMITTED变为ACCEPTED.
主要逻辑:
1.创建RMAppAttempt,RMAppImpl会维护一个
<ApplicationAttemptId, RMAppAttempt>的映射表
2.触发RMAppStartAttemptEvent (RMAppAttemptEventType.START)事件,该事件的处理器是AttemptStartedTransition.
private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.createAndStartNewAttempt(false);
};
}
private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
/**
*触发RMAppAttemptEventType.START事件
*/
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
transferStateFromPreviousAttempt));
}
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
/**
* 这里有尝试次数限制
*/
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
/**
* 记录在attempts映射表中
*/
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
想查找RMAppAttemptEventType类型的处理器是哪个,到ResourceManager中查找为该事件类型注册的调度器。
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
@Private
public static final class ApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
private final RMContext rmContext;
public ApplicationAttemptEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppAttemptEvent event) {
ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
ApplicationId appAttemptId = appAttemptID.getApplicationId();
RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId);
if (rmApp != null) {
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
if (rmAppAttempt != null) {
try {
rmAppAttempt.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for applicationAttempt " + appAttemptId, t);
}
}
}
}
}
从ApplicationAttemptEventDispatcher的handle()方法可以看出,事件处理器是RMAppAttemptImpl.