版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83591378
RMAppNewlySavingTransition(基于hadoop 2.7.6)
经过RMAppNewlySavingTransition后,RMApp的状态由NEW转为NEW_SAVING.根据提交上下文创建应用程序状态数据结构(主要是提交时间,启动事件,提交上下文,提交用户等),然后触发RMStateStoreEventType.STORE_APP事件,STORE_APP事件处理器会把应用程序数据结构存储到ZK上,详见3.
这个Transition比较简单,直接上代码清单
private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
}
鉴于详见1对rmContext的说明,stateStore是从ResourceManager上下文从提取的,可以猜到stateStore是ResourceManager初始化的时候创建的。见RMActiveServices的serviceInit:
protected void serviceInit(Configuration configuration) throws Exception {
...省略...
RMStateStore rmStore = null;
/**
* <!--启用自动恢复的yarn-site.xml配置-->
* <property>
* <name>yarn.resourcemanager.recovery.enabled</name>
* <value>true</value>
* </property>
*/
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
/**
如果不是开启了恢复功能,默认是NullRMStateStore(什么都不存储)
*/
if (isRecoveryEnabled) {
recoveryEnabled = true;
/**
* <!--指定resourcemanager的状态信息存储在zookeeper集群 yarn-site.xml-->
* <property>
* <name>yarn.resourcemanager.store.class</name>
* <value>
* org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
* </value>
* </property>
*/
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
rmStore.setResourceManager(rm);
} catch (Exception e) {
LOG.error("Failed to init state store", e);
throw e;
}
rmContext.setStateStore(rmStore);
...省略...
}
默认情况下是NullRMStateStore,就是什么都不做,其基类是RMStateStore,storeNewApplication方法正是基类RMStateStore的方法.
@SuppressWarnings("unchecked")
public void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
/**
* 注意,这里dispatcher是RMStateStore内部的调度器,由其自己的serviceInit方法中创建(非ResourceManager的全局调度器),然后把该事件放在了自己的内部事件队列中.而其中的rmDispatcher才是ResourceManager传递过来的全局调度器.RMStateStoreAppEvent的默认类型是RMStateStoreEventType.STORE_APP
*/
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
注意,这里的dispatcher是RMStateStore内部的调度器,由其自己的serviceInit方法创建(非ResourceManager的全局调度器),然后把该事件放在了自己的内部事件队列中.而其中的rmDispatcher才是ResourceManager传递过来的全局调度器.