上文说到ApplicationMaster的调度便戛然而止,本文继续。
private static final class AddApplicationToSchedulerTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, app.submissionContext.getReservationID())); } }
上文说到这段代码,里面的app.handler实际上是rmdispatcher的handler:
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
这种事件的类型是SchedulerEventType类型,所以交给schedulerDispatcher来handle,追溯下去,是由默认的CapacityScheduler来负责的,相关代码如下:
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, String user, boolean isAppRecovering, ReservationId reservationID) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; }
这里确定事件类型:
case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; String queueName = resolveReservationQueueName(appAddedEvent.getQueue(), appAddedEvent.getApplicationId(), appAddedEvent.getReservationID()); if (queueName != null) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); } }
这里是处理部分的代码:只截取了部分重要的代码:
// Submit to the queue try { queue.submitApplication(applicationId, user, queueName); } catch (AccessControlException ace) { // Ignore the exception for recovered app as the app was previously accepted if (!isAppRecovering) { LOG.info("Failed to submit application " + applicationId + " to queue " + queueName + " from user " + user, ace); this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppRejectedEvent(applicationId, ace.toString())); return; } } // update the metrics queue.getMetrics().submitApp(user); SchedulerApplication<FiCaSchedulerApp> application = new SchedulerApplication<FiCaSchedulerApp>(queue, user); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } } else { rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); }
先提交到对应的队列内,然后再由rmContext内部的dispatcher再行调度,而这个调度器,就是全局的调度器,根据事件类型,我们找到处理函数:
addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
private static final class StartAppAttemptTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.createAndStartNewAttempt(false); }; }
private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createNewAttempt(); handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), transferStateFromPreviousAttempt)); }
这里,再次提交了一个RMAppStartAttemptEvent,开始创建一个ApplicationMaster的尝试操作,因为我们知道,可能启动一次不会成功,所以要有尝试的机会:
我们重在分析最后一个方法内的两个调用:
private void createNewAttempt() { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, // The newly created attempt maybe last attempt if (number of // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); attempts.put(appAttemptId, attempt); currentAttempt = attempt; }
创建一个启动尝试,需要很多环境参数,这里主要汇聚了ApplicationMaster的相关参数,调度器,管理服务等,都是与ApplicationMaster启动密切相关的.
第二个方法,再次提交了一个事件,而这个类型的事件调度,如下:
rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext));
addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition())
private static final class AttemptStartedTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { boolean transferStateFromPreviousAttempt = false; if (event instanceof RMAppStartAttemptEvent) { transferStateFromPreviousAttempt = ((RMAppStartAttemptEvent) event) .getTransferStateFromPreviousAttempt(); } appAttempt.startTime = System.currentTimeMillis(); // Register with the ApplicationMasterService appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId); if (UserGroupInformation.isSecurityEnabled()) { appAttempt.clientTokenMasterKey = appAttempt.rmContext.getClientToAMTokenSecretManager() .createMasterKey(appAttempt.applicationAttemptId); } // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); } }
我们看下这个事件:
public AppAttemptAddedSchedulerEvent(ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; this.isAttemptRecovering = isAttemptRecovering; }
很明显,提交的事件类型是SchedulerEventType.APP_ATTEMPT_ADDED类型,我们看下相应的handle方法,一步步看:
this.eventHandler = rmContext.getDispatcher().getEventHandler();
这说明事件是交给了全局的调度器来负责调度:
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
根据事件类型,找到对应的调度器,是RM内部的SchedulerEventDispatcher,而其内部的handler逻辑,实际上是由内部的EventProcessor来处理的,这是个继承了Runnable的类,用于异步处理:
scheduler.handle(event);
而这里的scheduler,则是RM全局的CapacityScheduler,我们看看其如何处理这个事件:
case APP_ATTEMPT_ADDED: { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), appAttemptAddedEvent.getIsAttemptRecovering()); }
看起来,好像构建了新的事件需要进行后续的调度,我们看看addApplicationAttempt方法:
SchedulerApplication<FiCaSchedulerApp> application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application.getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationAttemptId + " is recovering. Skipping notifying ATTEMPT_ADDED"); } } else { rmContext.getDispatcher().getEventHandler() .handle(new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); }
前面的代码不分析了,重点看这里的继续调度过程,再次给全局调度器递交了一个事件,这个事件类型是RMAppAttemptEventType.ATTEMPT_ADDED,我们发现这个事件,交给了:
rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext));
看看这个事件是如何被ApplicationAttemptEventDispatcher处理的:
ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); ApplicationId appAttemptId = appAttemptID.getApplicationId(); RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId); if (rmApp != null) { RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID); if (rmAppAttempt != null) { try { rmAppAttempt.handle(event); } catch (Throwable t) { LOG.error("Error in handling event type " + event.getType() + " for applicationAttempt " + appAttemptId, t); } } }
这里的rmAppAttempt,实际实现是rmAppAttemptImpl,点进去看其handle方法:
this.writeLock.lock(); try { ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); LOG.debug("Processing event for " + appAttemptID + " of type " + event.getType()); final RMAppAttemptState oldState = getAppAttemptState(); try { /* keep the master in sync with the state machine */ this.stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state", e); /* TODO fail the application on the failed transition */ } if (oldState != getAppAttemptState()) { LOG.info(appAttemptID + " State change from " + oldState + " to " + getAppAttemptState()); } } finally { this.writeLock.unlock(); }
这里,走的是状态机转换,原先,我们状态机的状态为:RMAppAttemptSTATE.SUBMITTED,提交了事件RMAppAttemptSTATE.APP_ATTEMPT_ADDED:
addTransition(RMAppAttemptState.SUBMITTED, EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.ATTEMPT_ADDED, new ScheduleTransition())
我们看下schedulerTransition的代码:
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { ApplicationSubmissionContext subCtx = appAttempt.submissionContext; if (!subCtx.getUnmanagedAM()) { // Need reset #containers before create new attempt, because this request // will be passed to scheduler, and scheduler will deduct the number after // AM container allocated // Currently, following fields are all hard code, // TODO: change these fields when we want to support // priority/resource-name/relax-locality specification for AM containers // allocation. appAttempt.amReq.setNumContainers(1); appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); // AM resource has been checked when submission Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); } return RMAppAttemptState.SCHEDULED; } else { // save state and then go to LAUNCHED state appAttempt.storeAttempt(); return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; } }
重点在这里,开始调度之后,就需要尝试分配资源,并且在指定的container上准备启动尝试了:
我们看下这里的资源分配代码:
Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null);
追溯下去,重点在这儿:
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
return application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability());
前面的检查和释放Container不看了,直接看这两句话,内部还有另一个scheduler,来分配资源,这一段逻辑有些复杂,彻底来看一看:
前文,我们调用了capacityScheduler的addApplication方法,内部有两句代码:
SchedulerApplication<FiCaSchedulerApp> application = new SchedulerApplication<FiCaSchedulerApp>(queue, user); applications.put(applicationId, application);
而后,我们调用了addApplicationAttempt方法,内部新建了一个FicaSchedulerApp:
SchedulerApplication<FiCaSchedulerApp> application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application.getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt);
这个,就是我们后来使用的FicaSchedulerApp,我们看看这个FicaSchedulerApp的初始化代码:
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); Resource amResource; if (rmApp == null || rmApp.getAMResourceRequest() == null) { // the rmApp may be undefined (the resource manager checks for this too) // and unmanaged applications do not provide an amResource request // in these cases, provide a default using the scheduler amResource = rmContext.getScheduler().getMinimumResourceCapability(); } else { amResource = rmApp.getAMResourceRequest().getCapability(); } setAMResource(amResource); }
分析这个方法,需要注意,如果我们提交ApplicationMaster的时候,没有指定资源的话,则使用最小资源容量,所以通常我们需要配置,我们看下最小资源容量,这个资源容量是在CapacityScheduler中配置的:
public Resource getMinimumAllocation() { int minimumMemory = getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); int minimumCores = getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); return Resources.createResource(minimumMemory, minimumCores); }
追本溯源,发现最小内存是1G,最小核数是1,因为这里加载的是我们yarn的配置,所以可以配置下,修改我们的默认值:
注意,最后一句话,执行成功之后,RMAppAttemptImpl的状态转换为RMAppAttemptSTATE.SCHEDULED:
ContainersAndNMTokensAllocation allocation = pullNewlyAllocatedContainersAndNMTokens(); return new Allocation(allocation.getContainerList(), getHeadroom(), null, currentContPreemption, Collections.singletonList(rr), allocation.getNMTokenList());
实际上,在return新的Allocation之前,有一句代码很重要,如上,我们点进去看下:
List<Container> returnContainerList = new ArrayList<Container>(newlyAllocatedContainers.size()); List<NMToken> nmTokens = new ArrayList<NMToken>(); for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i.hasNext();) { RMContainer rmContainer = i.next(); Container container = rmContainer.getContainer(); try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager().createContainerToken( container.getId(), container.getNodeId(), getUser(), container.getResource(), container.getPriority(), rmContainer.getCreationTime(), this.logAggregationContext)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); if (nmToken != null) { nmTokens.add(nmToken); } } catch (IllegalArgumentException e) { // DNS might be down, skip returning this container. LOG.error("Error trying to assign container token and NM token to" + " an allocated container " + container.getId(), e); continue; } returnContainerList.add(container); i.remove(); rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
这里,我们注意看下事件提交的部分,很重要,对于每个rmContainer都提交了一个事件,而这个事件的处理,则交给了:
.addTransition(RMContainerState.ALLOCATED, RMContainerState.ACQUIRED, RMContainerEventType.ACQUIRED, new AcquiredTransition())
注意,这是RMContainerImpl内的状态机的方法,为什么状态时从Allocated到Acquired的转换,是因为前面我们在获取资源的时候,分配的过程中,对应的RMContainerImpl的状态已经发生了转换,从new转换为了Allocated;我们看看AcquiredTransition内部的处理:
// Tell the app container.eventHandler.handle(new RMAppRunningOnNodeEvent( container.getApplicationAttemptId().getApplicationId(), container.nodeId));
重点在这这儿,而这里的container实际上是RMContainerImpl,执行的是其内部的eventHandler的handle方法,我们看下:
// Register event handler for RmAppEvents rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext));
对应的处理,在状态机转换的逻辑内:
addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition())
我们看看这个转换器:
public void transition(RMAppImpl app, RMAppEvent event) { RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; // if final state already stored, notify RMNode if (isAppInFinalState(app)) { app.handler .handle(new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent.getApplicationId())); return; } // otherwise, add it to ranNodes for further process app.ranNodes.add(nodeAddedEvent.getNodeId()); };
毫无引文,我们需要走第二步,看看app.ranNodes是什么作用,主要是添加了运行的节点,在此不赘述了。
继续从前面的逻辑看:
// AM resource has been checked when submission Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); } return RMAppAttemptState.SCHEDULED;
这里,我们看到,触发了RMAppAttemptState.SCHEDULED的转换:
addTransition(RMAppAttemptState.SCHEDULED, EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition())
追本溯源,找到这里,我们看下AMContainerAllocatedTransition的代码:
// Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(0)); RMContainerImpl rmMasterContainer = (RMContainerImpl) appAttempt.scheduler .getRMContainer(appAttempt.getMasterContainer().getId()); rmMasterContainer.setAMContainer(true); // The node set in NMTokenSecrentManager is used for marking whether the // NMToken has been issued for this node to the AM. // When AM container was allocated to RM itself, the node which allocates // this AM container was marked as the NMToken already sent. Thus, // clear this node set so that the following allocate requests from AM are // able to retrieve the corresponding NMToken. appAttempt.rmContext.getNMTokenSecretManager().clearNodeSetForAttempt(appAttempt.applicationAttemptId); appAttempt.getSubmissionContext().setResource(appAttempt.getMasterContainer().getResource()); appAttempt.storeAttempt(); return RMAppAttemptState.ALLOCATED_SAVING;
这个转换,顾名思义,就是AMContainer的分配,可以看出里面新建了一个RMContainerImpl,说明,要开始与NodeManager交互了,需要NodeManager来调度事件,准备启动ApplicationMaster了:
private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved LOG.info("Storing attempt: AppId: " + getAppAttemptId().getApplicationId() + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); rmContext.getStateStore().storeNewApplicationAttempt(this); }
我们看下其中的这个方法,把本次提交的这个RMAppAttemptImpl交给了RM的大管家,将本次尝试存储起来。
而且,最后这个转换提交了一个新的状态:RMAppAttemptState.ALLOCATED_SAVING。
本文到此结束,下文将讨论这次提交的状态,带来的下次转化。