先看这个类的入口main方法,
public static void main(String args[])
{
try
{
//准备一些参数
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr = System.getenv(org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.name());
String nodePortString = System.getenv(org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_PORT.name());
String nodeHttpPortString = System.getenv(org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr = System.getenv("APP_SUBMIT_TIME_ENV");
validateInputParam(containerIdStr, org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.name());
validateInputParam(nodeHostString, org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.name());
validateInputParam(nodePortString, org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_PORT.name());
validateInputParam(nodeHttpPortString, org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr, "APP_SUBMIT_TIME_ENV");
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
//创建MRAppMaster对象
MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), Integer.parseInt(nodeHttpPortString), appSubmitTime);
//准备其他的参数
ShutdownHookManager.get().addShutdownHook(new MRAppMasterShutdownHook(appMaster), 30);
JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path("job.xml"));
MRWebAppUtil.initialize(conf);
String jobUserName = System.getenv(org.apache.hadoop.yarn.api.ApplicationConstants.Environment.USER.name());
conf.set("mapreduce.job.user.name", jobUserName);
//初始化并启动MRAppMaster
initAndStartAppMaster(appMaster, conf, jobUserName);
}
catch(Throwable t)
{
LOG.fatal("Error starting MRAppMaster", t);
ExitUtil.terminate(1, t);
}
}
需要注意MRAppMaster有两个构造器
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime);
}
//事实上都是调用下面这个
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
初始化并启动的方法,实际上分别调用了appMaster的init和start
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use
// them
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens:");
for (Token<?> token : credentials.getAllTokens()) {
LOG.info(token);
}
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
// Now remove the AM->RM token so tasks don't have it
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}
真正的初始化和启动方法
- appMaster的初始化
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
- service的初始化
protected void serviceInit(final Configuration conf) throws Exception {
// create the job classloader if enabled
createJobClassLoader(conf);
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
initJobCredentialsAndUGI(conf);
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
newApiCommitter = false;
jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
appAttemptID.getApplicationId().getId());
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if ((numReduceTasks > 0 &&
conf.getBoolean("mapred.reducer.new-api", false)) ||
(numReduceTasks == 0 &&
conf.getBoolean("mapred.mapper.new-api", false))) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
boolean copyHistory = false;
committer = createOutputCommitter(conf);
try {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
FileSystem fs = getFileSystem(conf);
boolean stagingExists = fs.exists(stagingDir);
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
boolean commitStarted = fs.exists(startCommitFile);
Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
boolean commitSuccess = fs.exists(endCommitSuccessFile);
Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
boolean commitFailure = fs.exists(endCommitFailureFile);
//考虑一些异常情况
if(!stagingExists) {
...
} else if (commitStarted) {
...
} else {
...
}
}
}
} catch (IOException e) {
throw new YarnRuntimeException("Error while initializing", e);
}
if (errorHappenedShutDown) {
...
} else {
//下面是正常的启动流程
dispatcher = createDispatcher();
addIfService(dispatcher);
//service to handle requests from JobClient
clientService = createClientService(context);
// Init ClientService separately so that we stop it separately, since this
// service needs to wait some time before it stops so clients can know the
// final states
clientService.init(conf);
containerAllocator = createContainerAllocator(clientService, context);
//service to handle the output committer
committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
historyService);
this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(CommitterEventType.class, committerEventHandler);
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
//optional service to speculate on task attempts' progress
speculator = createSpeculator(conf, context);
addIfService(speculator);
}
speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
dispatcher.register(Speculator.EventType.class,
speculatorEventDispatcher);
addService(createStagingDirCleaningService());
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
containerLauncher = createContainerLauncher(context);
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
addIfService(historyService);
}
super.serviceInit(conf);
} // end of init()
这当中创建了许多服务,并通过addIfService方法添加,需要注意这个方法继承自父类CompositeService
,或许这是继承它的原因之一。
关于这些服务的详解可见Yarn之MRAppMaster详解
下面是启动的方法
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}