Capacity Scheduler(基于hadoop 2.7.6)
Capacity Scheduler 主要功能:
1.应用程序初始化
应用程序提交到ResourceManager后,ResouceManager会向Capacity Scheduler发送一个SchedulerEventType.APP_ADDED事件,Capacity Scheduler收到该事件后,将为应用程序创建一个SchedulerApplication(主要包括queue,user,FiCaSchedulerApp成员)对象跟踪和维护该应用程序的运行时信息,同时将应用程序提交到对应的叶子队列,叶子队列会对应用程序进行一系列合法性检查,等待分配.
2.资源调度
当ResourceManager收到来自NodeManager发送的心跳信息后,将向Capacity Scheduler发送一个SchedulerEventType.NODE_UPDATE事件,Capacity Scheduler收到该事件后,会依次进行一下操作:
(1)处理心跳信息
(2)资源分配
APP_ADDED事件的主要处理逻辑:
1.创建SchedulerApplication(主要包括queue,user,FiCaSchedulerApp成员)对象跟踪和维护该应用程序的运行时信息,FiCaSchedulerApp在处理APP_ATTEMPT_ADDED事件时实例化.后续的资源调度分配都是基于该FiCaSchedulerApp对象.
2.触发RMAppEventType.APP_ACCEPTED事件,该事件由StartAppAttemptTransition转换器处理.
下面贴贴APP_ADDED事件处理的代码清单:
@Override
public void CapacityScheduler.handle(SchedulerEvent event) {
...省略...
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
String queueName =
resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(),
appAddedEvent.getReservationID());
if (queueName != null) {
if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName,appAddedEvent.getUser());
} else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,appAddedEvent.getUser());
}
}
}
break;
...省略...
}
主要逻辑在addApplication(addApplicationOnRecovery用于恢复,暂不考虑),代码清单:
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user) {
queueName = getQueueMappings(applicationId, queueName, user);
if (queueName == null) {
return;
}
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return;
}
/**
必须是叶子队列,才可以提交application
*/
if (!(queue instanceof LeafQueue)) {
String message = "Application " + applicationId +
" submitted by user " + user + " to non-leaf queue: " + queueName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return;
}
try {
/**
* 在叶子队列上提交应用程序
*/
queue.submitApplication(applicationId, user, queueName);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application " + applicationId + " to queue "
+ queueName + " from user " + user, ace);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, ace.toString()));
return;
}
queue.getMetrics().submitApp(user);
/**
创建一个SchedulerApplication<FiCaSchedulerApp>对象跟踪和维护该应用程序的运行时信息,FiCaSchedulerApp在处理SchedulerEventType.APP_ATTEMPT_ADDED时实例化
*/
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
主要逻辑:
1.队列对应用程序的合法性检查(是否为叶子队列)
2.创建一个FiCaSchedulerApp对象跟踪和维护改应用程序的运行时信息
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user);
applications.put(applicationId, application);
这里只是创建了SchedulerApplication对象,还未实例化FiCaSchedulerApp对象,处理SchedulerEventType.APP_ATTEMPT_ADDED事件时实例化FiCaSchedulerApp对象,用以记录必要的运行时信息。
3.触发RMAppEventType.APP_ACCEPTED事件
queue.submitApplication(applicationId, user, queueName);
...省略...
/**
触发RMAppEventType.APP_ACCEPTED事件
*/
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
(LeafQueue)queue.submitApplication主要逻辑:
- 合法性检查
- 队列状态必须为running
- 没有超过每个队列可接受的应用程序数量上限
- 没有超过每个用户可提交的应用程序数量上限
- 父队列对应用程序数量累加统计