Capacity Scheduler(基于hadoop 2.7.6)
Capacity Scheduler 主要功能:
1.应用程序初始化
应用程序提交到ResourceManager后,ResouceManager会向Capacity Scheduler发送一个SchedulerEventType.APP_ADDED事件,Capacity Scheduler收到该事件后,将为应用程序创建一个SchedulerApplication(主要包括queue,user,FiCaSchedulerApp成员)对象跟踪和维护该应用程序的运行时信息,同时将应用程序提交到对应的叶子队列,叶子队列会对应用程序进行一系列合法性检查.
2.资源调度
当ResourceManager收到来自NodeManager发送的心跳信息后,将向Capacity Scheduler发送一个SchedulerEventType.NODE_UPDATE事件,Capacity Scheduler收到该事件后,会依次进行一下操作:
(1)处理心跳信息
(2)资源分配
APP_ATTEMPT_ADDED事件主要处理逻辑:
1.创建FiCaSchedulerApp类型的application,通过队列资源上限和用户资源上限检查后,记录到队列的activeApplications等待资源调度分配(即收到数据节点的NODE_UPDATE事件时进行资源调度分配).
2.触发RMAppAttemptEventType.ATTEMPT_ADDED事件,该事件由ScheduleTransition转换器处理.
下面贴贴SchedulerEventType.APP_ATTEMPT_ADDED的处理代码清单:
@Override
public void CapacityScheduler.handle(SchedulerEvent event) {
...省略...
/**
* 1.利用user的pendingApplications和activateApplications两个域计数用户启动的应用程序数量
* 2.利用队列的pengdingApplications和activateApplications两个域,切换计算是否符合资源上限:
* 符合则加入队列的activateApplications列表等待资源调度分配
*/
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
...省略...
}
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
/**
* 在处理SchedulerEventType.APP_ADDED事件时,已经实例化了SchedulerApplication对象
* FiCaSchedulerApp类我的理解: 它是在Application Attempt在Scheduler端的实现
*/
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null) {
LOG.warn("Application " + applicationAttemptId.getApplicationId() +
" cannot be found in scheduler.");
return;
}
CSQueue queue = (CSQueue) application.getQueue();
/**
* 创建FiCaSchedulerApp用于记录AppAttempt运行信息
* FiCaSchedulerApp继承自SchedulerApplicationAttempt
* 主要主体分配逻辑相关的主要成员是
* 1.appSchedulingInfo:AppSchedulingInfo
* 2.attemptId:ApplicationAttemptId
* 3.liveContainers:Map<ContainerId, RMContainer>
* appSchedulingInfo里有一个Map<Priority, Map<String, ResourceRequest>>类型的需求列表,后面进行资源分配时根据其具体值进行判断是否可以分配.
*/
FiCaSchedulerApp attempt =
new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
queue, queue.getActiveUsersManager(), rmContext);
/**
* 是否复用之前记录的attempt信息,默认写死了是false
*/
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
/**
* 1.利用user的pendingApplications和activateApplications两个域计数用户启动的应用程序数量
* 2.利用队列的pengdingApplications和activateApplications两个域,切换计算是否符合资源上限:
* 符合则加入队列的activateApplications列表等待资源调度分配
*
* 这里队列queue在处理SchedulerEventType.APP_ADDED事件时已经根据提交的队列名设置了
*/
queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
/**
*触发RMAppAttemptEventType.ATTEMPT_ADDED事件,该事件由ScheduleTransition转换器处理
*/
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
synchronized (this) {
User user = getUser(userName);
// Add the attempt to our data-structures
/**
* 注意:这里的application是FiCaSchedulerApp类型
*/
addApplicationAttempt(application, user);
}
// We don't want to update metrics for move app
if (application.isPending()) {
metrics.submitAppAttempt(userName);
}
getParent().submitApplicationAttempt(application, userName);
}
/**
* 1.利用user的pendingApplications和activateApplications两个域计数用户启动的应用程序数量
* 2.利用队列的pengdingApplications和activateApplications两个域,切换计算是否符合资源上限:
* 符合则加入activateApplications等待资源分配
*
* 注意:这里的application是FiCaSchedulerApp类型
* @param application
* @param user
*/
private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
// Accept
/**
* 只是对用户提交的应用程序计数累加
*/
user.submitApplication();
/**
* 添加到等待分配列表中,后面的activateApplications会遍历pendingApplications进行匹配
*/
pendingApplications.add(application);
/**
* 因为涉及到ApplicationAttempt的方法都是synchronized方法,为了assginContainer分配资源时
* 获取ApplicationAttempt更加快速、减少锁的约束维护了applicationAttemptMap
*/
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
/**
* 主要是根据用户资源上限和队列资源上限检查,检查通过后加入队列的activeApplications列表,后面有资源时进行分配
*/
activateApplications();
LOG.info("Application added -" +
" appId: " + application.getApplicationId() +
" user: " + user + "," + " leaf-queue: " + getQueueName() +
" #user-pending-applications: " + user.getPendingApplications() +
" #user-active-applications: " + user.getActiveApplications() +
" #queue-pending-applications: " + getNumPendingApplications() +
" #queue-active-applications: " + getNumActiveApplications()
);
}
/**
* 经过队列资源上限和用户资源上限检查后加入activeApplications,后面有资源时进行分配
*/
private synchronized void activateApplications() {
//limit of allowed resource usage for application masters
Resource amLimit = getAMResourceLimit();
Resource userAMLimit = getUserAMResourceLimit();
/**
* 遍历等待资源分配applicationAttempt(FiCaSchedulerApp)
*/
for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator();
i.hasNext(); ) {
FiCaSchedulerApp application = i.next();
// Check am resource limit
/**
* AMResource在客户端提交的时候就已经初始化(默认memory:1536,virtualcores:1,在YARNRunner里设置)
*/
Resource amIfStarted =
Resources.add(application.getAMResource(), queueUsage.getAMUsed());
if (LOG.isDebugEnabled()) {
LOG.debug("application AMResource " + application.getAMResource() +
" maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
" amLimit " + amLimit +
" lastClusterResource " + lastClusterResource +
" amIfStarted " + amIfStarted);
}
/**
* 检查会不会超过队列的资源上限
*
* resourceCalculator默认情况下是DefaultResourceCalculator,可以配置成DominantResourceCalculator
* DefaultResourceCalculator只是比较内存,DominantResourceCalculator比较内存和cpu
*
* 翻代码可以看出lastClusterResource似乎是没有进行任何比较的,算法是:
* (amIfStarted-amLimit)<=0
*/
if (!Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource, amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1) {
LOG.warn("maximum-am-resource-percent is insufficient to start a" +
" single application in queue, it is likely set too low." +
" skipping enforcement to allow at least one application to start");
} else {
LOG.info("not starting application as amIfStarted exceeds amLimit");
continue;
}
}
// Check user am resource limit
User user = getUser(application.getUser());
Resource userAmIfStarted =
Resources.add(application.getAMResource(),
user.getConsumedAMResources());
/**
* 检查是不是超过用户的资源上限
*/
if (!Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource, userAmIfStarted,
userAMLimit)) {
if (getNumActiveApplications() < 1) {
LOG.warn("maximum-am-resource-percent is insufficient to start a" +
" single application in queue for user, it is likely set too low." +
" skipping enforcement to allow at least one application to start");
} else {
LOG.info("not starting application as amIfStarted exceeds " +
"userAmLimit");
continue;
}
}
/**
* 将用户的pengdingApplication计数减1,activeApplications计数加1
*/
user.activateApplication();
/**
* 主要是加入activeApplications,后面有资源时进行分配
*/
activeApplications.add(application);
/**
* 增加队列的资源使用情况计数
*/
queueUsage.incAMUsed(application.getAMResource());
/**
* 增加用户的资源使用情况计数
*/
user.getResourceUsage().incAMUsed(application.getAMResource());
/**
* 已经完成计算,则删除
*/
i.remove();
LOG.info("Application " + application.getApplicationId() +
" from user: " + application.getUser() +
" activated in queue: " + getQueueName());
}
}