「这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战」
本文的阅读需要一些YARN Service的知识,相关内容请查看:
下面就ResourceManager的启动流程做详细的源码解读
1 serviceInit()
ResourceManager是一个Service,所以ResourceManager的启动当然是要找init()方法,该类本身没有实现Init()方法,所以使用父类方法,父类的Init()方法调用了serviceInit(),并且在ResourceManager 重写,下面详细看ResourceManager .serviceInit()
该方法主要做了下面三件事情:
创建各种子service实例
new xxx();createxxx(); 复制代码
将该实例加入到serviceList集合中
addService(service); addIfService(object); 复制代码
执行各个子service的serviceInit()方法
super.init(); 复制代码
该类的serviceInit方法从上至下依次为:
生成上下文对象
this.rmContext = new RMContextImpl()
复制代码
加载配置文件core-site.xml
loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
复制代码
加载配置文件yarn-site.xml
loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
复制代码
HA的一些设置和相关服务地址的检查 默认:yarn.resourcemanager.ha.enabled = true
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
复制代码
初始化和添加AsyncDispatchen(重点)
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
复制代码
adminService
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
复制代码
选举相关
if (this.rmContext.isHAEnabled()) {
/**
* yarn.resourcemanager.ha.automatic-failover.emabled = true
* yarn.resourcemanager.ha.automatic-failover.embedde = true
* embedde = true 表明使用原生的zookeeperAPI来执行选举 否则使用curator框架来实现
* 不同点:zookeeperAPI复杂一点
*/
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf)) {
//创建选举器 ActiveStandbyElectorBasedElectorService
// 阻塞的进行最大三次的尝试选举 不成功交给线程
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}
复制代码
当前节点为active RM才启动的服务 (service)
createAndInitActiveServices
复制代码
初始化各种服务(子service) 调用各种service实例的serviceInit( )方法
super.serviceInit(this.conf);
复制代码
初始化的内容很多,选取选取几个重要的内容进行详细说明:
2 createEmbeddedElector
下面详细看一下选举初始化的相关内容
createEmbeddedElector()方法作为入口
- 默认情况下是使用的zookeeperAPI进行选举,所以走else分支
createEmbeddedElector(){
if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
}
}
复制代码
- ActiveStandbyElectorBasedElectorService返回了该类型的选举实例,该类也是一个Service,所以该类的初始化找该类下面的serviceInit()方法
该方法主要做下面三件事情:
1.选举实例的创建 2.zookeeper的链接 3.创建选举过程中需要创建的路径 复制代码
ActiveStandbyElectorBasedElectorService.serviceInit()具体为:
获取zookeeper地址
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
复制代码
获取cluster id 和rescourcemanager id
String rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
复制代码
完成 zookeeper的链接 ,创建一个znode字节数组 localActiveNodeInfo对应zookeeperper znode上面所存储的数据
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
复制代码
创建选举过程中需要的路径
zkBasePath = /yarn-leader-election (yarn-leader-election节点)
electionZNode = /yarn-leader-election/clusterId
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
复制代码
选举的尝试次数 默认为3次
int maxRetryNum =
conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
.getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
复制代码
创建选举实例
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
复制代码
确保父节点 zkBasePath 存在
elector.ensureParentZNode();
复制代码
初始化服务,调用相关子服务的serviceInit()方法
super.serviceInit(conf)
复制代码
3 createAndInitActiveServices
createAndInitActiveServices是只要当前RecourceManager是active状态才需要启动的一些服务
核心入口就是下面这句代码,它的内部创建了一个RMActiveServices对象,并且调用了该实例的init)方法。其实init()方法的内部就是调用servicelnit()方法
protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}
复制代码
接下来看RMActiveServices的serviceInit()方法:
serviceInit()
创建一个选举线程来完成选举
之前创建了一个 选举实例
此时创建了一个 选举线程
选举实例,会尝试最多3次选举,如果没有成功, 则启动这个线程来执行选举 最终都是使用选举实例的选举方法实现选举的
standByTransitionRunnable = new StandByTransitionRunnable();
复制代码
做token管理 指纹的一些工作
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
复制代码
在初始化方法中有好多好多的Service被创建并且添加到了serviceList列表中,后面就不在明确写出addService()方法
过期处理
当ApplicationMster 收到ResourceManager 新分配到一个container后,必须在一定的时间内(10min)在对应的NM上启动该container,否则就会回收
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
复制代码
资源调度策略 resourceScheduler 默认实现的是 CapacityScheduler
scheduler = createScheduler();
复制代码
除了有各种各样的Service被创建外,还有很多的eventHandler被注册到AsyncDispatcher
注册方法有两个参数:
- 第一个参数为: 事件类型
- 第二个参数为: 事件处理器
如果提交一个事件到 AsyncDispatcher AsyncDispatcher就会找到之前注册的事件所对应的EventHandler 的handle 方法执行事件的处理
EventHandler 有可能是状态机,也可能就是一个简单的 EventHandler
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext));
// Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
// Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext));
复制代码
2.4 serviceStart()
接下来看ResourceManager的serviceStart()方法
刚启动当前的节点就成为standby
if (this.rmContext.isHAEnabled()) {
transitionToStandby(false);
}
复制代码
启动用户webapp 用户认证服务
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
复制代码
起初始化创建的所有子Service进行启动
super.serviceStart();
复制代码
如果没有启动HA 当前节点直接成为active,只有一台ResourceManager它肯定为active状态
if (!this.rmContext.isHAEnabled()) {
transitionToActive();
}
复制代码
此处注意,当启动了HA模式的时候,该怎么选举active节点呢?
还记得在ResourceManager.serviceInit()方法有下面一段代码
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
复制代码
创建了一个选举的实例并且加入了serviceList中,现在所有的子service启动当然也包含了这个Service的启动,关于HARN HA 的选举机制将会在下一篇文章整理讲述。
本人在学习的路上,上述文章如有错误还请指教批评,谢谢。