AddApplicationToSchedulerTransition(基于hadoop 2.7.6)
主要逻辑是触发了SchedulerEventType.APP_ADDED事件,该事件会由CapacityScheduler.handle处理.
AddApplicationToSchedulerTransition转换处理器后RMApp状态由NEW_SAVING变为SUBMITTED.
单纯,直接上代码清单:
private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
app.submissionContext.getReservationID()));
}
}
那么接下来是要看AppAddedSchedulerEvent(SchedulerEventType.APP_ADDED)事件对应的处理器是哪个?
从上面的transition方法可以看到app是RMAppImpl,去翻RMAppImpl的构造函数,可以知道它的dispatcher和handler都是从ResourceManager上下文rmContext中提取出来的,dispatcher是AysncDispatcher,handler是根据事件类型适配的。所以顺着思路我们到ResourceManager中找到APP_ADDED事件对应的处理器.
见ResourceManager中RMActiveServices的serviceInit方法:
/**
RMActiveServices是随着ResourceManager一起初始化的
*/
@Private
public class RMActiveServices extends CompositeService {
...省略...
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher)
}
...省略...
SchedulerEventType类型的事件由SchedulerEventDispatcher调度器调度.
那么再看SchedulerEventDispatcher的实现,其中handle方法:
@Override
/**
* 将申请资源的事件加入eventQueue队列,由run方法去消费处理,做真正的资源分配申请
*/
public void handle(SchedulerEvent event) {
try {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of scheduler event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on scheduler event queue: "
+ remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
}
}
handle方法主要是将事件加入eventQueue队列,由run方法去消费处理.这里明显是一个生产者消费者设计模式.消费者呢,见下面代码清单:
private final class EventProcessor implements Runnable {
@Override
public void run() {
SchedulerEvent event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}
try {
scheduler.handle(event);
} catch (Throwable t) {
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
}
消费是由内部类EventProcessor完成.EventProcessor线程类是由SchedulerEventDispatcher.serviceStart启动.主要消费逻辑是交给了scheduler.handle(event).scheduler默认情况下是CapacityScheduler,
YarnConfiguration.DEFAULT_RM_SCHEDULER值是
“org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler”
`,是在ResourceManager初始化时实例化:
@Override
protected void RMActiveServics.serviceInit(Configuration configuration) throws Exception {
...省略...
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
... 省略...
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler);
}
protected ResourceScheduler createScheduler() {
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
YarnConfiguration.DEFAULT_RM_SCHEDULER);
LOG.info("Using Scheduler: " + schedulerClassName);
try {
Class<?> schedulerClazz = Class.forName(schedulerClassName);
if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
this.conf);
} else {
throw new YarnRuntimeException("Class: " + schedulerClassName
+ " not instance of " + ResourceScheduler.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Scheduler: "
+ schedulerClassName, e);
}
}
到这里我们可以明确到CapacityScheduler里定位SchedulerEventType.APP_ADDED事件的处理逻辑.