版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhusirong/article/details/83745175
ScheduleTransition(基于hadoop 2.7.6)
主要逻辑:
1.将当前启动AM所需要的amReq需求更新维护到(FiCaSchedulerApp)application里的(Map<Priority, Map<String, ResourceRequest>>)requests列表.
CapacityScheduler进行资源分配时会对requests列表中的资源需求进行匹配.
2.ScheduleTransition转换器处理后,RMAppAttemptState由SUBMITTED转换为SCHEDULED.
代码清单:
@VisibleForTesting
public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
if (!subCtx.getUnmanagedAM()) {
/**
* 一个优先级为AM_CONTAINER_PRIORITY的Container
* (
* 值为0,可在任意节点上ResourceRequest.ANY,
* 资源量为X(appAttempt.getSubmissionContext().getResource()
* )
*/
appAttempt.amReq.setNumContainers(1);
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
/**
* allocate其实没有真正的分配资源,其主要逻辑有:
* 1.将当前的amReq需求更新维护到(FiCaSchedulerApp)application里的
* (Map<Priority, Map<String, ResourceRequest>>)requests列表
*
*/
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
}
return RMAppAttemptState.SCHEDULED;
} else {
// save state and then go to LAUNCHED state
appAttempt.storeAttempt();
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}
@Override
@Lock(Lock.NoLock.class)
public Allocation CapacityScheduler.allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed or non existent application " +
applicationAttemptId.getApplicationId());
return EMPTY_ALLOCATION;
}
if (!application.getApplicationAttemptId().equals(applicationAttemptId)) {
LOG.error("Calling allocate on previous or removed " +
"or non existent application attempt " + applicationAttemptId);
return EMPTY_ALLOCATION;
}
// Sanity check
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), getMaximumResourceCapability());
// Release containers
/**
* 释放Containers:
* 主要逻辑是触发RMContainerEventType.RELEASED事件,但是启动AM主逻辑里传进来的release是空的.
*/
releaseContainers(release, application);
synchronized (application) {
// make sure we aren't stopping/removing the application
// when the allocate comes in
if (application.isStopped()) {
LOG.info("Calling allocate on a stopped " +
"application " + applicationAttemptId);
return EMPTY_ALLOCATION;
}
if (!ask.isEmpty()) {
if(LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + applicationAttemptId +
" application=" + application);
}
application.showRequests();
// Update application requests
/**
* 将新需求更新到(FiCaSchedulerApp)application里的
* (Map<Priority, Map<String, ResourceRequest>>)requests列表
*/
application.updateResourceRequests(ask);
LOG.debug("allocate: post-update");
application.showRequests();
}
if(LOG.isDebugEnabled()) {
LOG.debug("allocate:" +
" applicationAttemptId=" + applicationAttemptId +
" #ask=" + ask.size());
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
/**
* allocate方法中的getAllocation方法会触发AcquiredTransition,而AcquiredTransition会将container加入到containerAllocationExpirer
*/
return application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
}
}
public synchronized void SchedulerApplicationAttempt.updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(requests, false);
}
}
synchronized public void updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
QueueMetrics metrics = queue.getMetrics();
// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName();
boolean updatePendingResources = false;
ResourceRequest lastRequest = null;
if (resourceName.equals(ResourceRequest.ANY)) {
if (LOG.isDebugEnabled()) {
LOG.debug("update:" + " application=" + applicationId + " request="
+ request);
}
updatePendingResources = true;
if (request.getNumContainers() > 0) {
priorities.add(priority);
activeUsersManager.activateApplication(user, applicationId);
}
ResourceRequest previousAnyRequest =
getResourceRequest(priority, resourceName);
/**
* 用最新的节点标签更新已经存在的相同优先级的资源申请
*/
if ((null == previousAnyRequest)
|| isRequestLabelChanged(previousAnyRequest, request)) {
Map<String, ResourceRequest> resourceRequest =
getResourceRequests(priority);
if (resourceRequest != null) {
for (ResourceRequest r : resourceRequest.values()) {
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
r.setNodeLabelExpression(request.getNodeLabelExpression());
}
}
}
}
} else {
ResourceRequest anyRequest =
getResourceRequest(priority, ResourceRequest.ANY);
if (anyRequest != null) {
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
}
}
/**
* requests是一个请求列表 map,查看requests中是否已有该优先级的请求
* this.requests中存放的是这个application的request
*/
Map<String, ResourceRequest> asks = this.requests.get(priority);
if (asks == null) {
/**
* 没有此优先级的请求,则new一个map
*/
asks = new ConcurrentHashMap<String, ResourceRequest>();
this.requests.put(priority, asks);
}
/**
* asks不为null,查看asks中是否有与此请求ResourceName一样的资源请求
*
* 这里似乎只匹配priority和resourcename,即使多个资源需求的memory和virtualcores要求不一样也会被忽视????
*/
lastRequest = asks.get(resourceName);
/**
* ScheduleTransition调用入口进来recoverPreemptedRequest是写死了false
*/
if (recoverPreemptedRequest && lastRequest != null) {
/**
* 设置新的需要的Container数量
*/
request.setNumContainers(lastRequest.getNumContainers() + 1);
}
/**
* 之前有见到博客同学,在他自己的博客里提了问题:之前的资源需求都覆盖掉不要了?
* 是这样,不是不要了,是对于一个应用程序而言就一个AM,一个AM就一个AM资源需求.
* 再返看APP_ATTEMPTED_ADDED,APP_ADDED的处理逻辑,CapacityScheduler维护了
* (SchedulerApplication)applications:一个应用程序唯一对应一个FiCaSchedulerApp,
* appSchedulingInfo是FiCaSchedulerApp的成员变量
* requests是appSchedulingInfo的成本变量
* 所以request维护一份最新的即可
*
*/
asks.put(resourceName, request);
if (updatePendingResources) {
// Similarly, deactivate application?
if (request.getNumContainers() <= 0) {
priorities.remove(priority);
LOG.info("checking for deactivate of application :"
+ this.applicationId);
checkForDeactivation();
}
int lastRequestContainers = lastRequest != null ? lastRequest
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
metrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
}
}
}
之前有见到博客同学,在他自己的博客里提了问题:之前的资源需求都覆盖掉不要了?
是这样,不是不要了,是对于一个应用程序而言就一个AM,一个AM就一个AM资源需求.
再返看APP_ATTEMPTED_ADDED,APP_ADDED的处理逻辑,CapacityScheduler维护了 (SchedulerApplication)applications:一个应用程序唯一对应一个FiCaSchedulerApp,而appSchedulingInfo是FiCaSchedulerApp的成员变量,requests又是appSchedulingInfo的成本变量,所以request维护一份最新的即可.