Hadoop 2.0的源代码中实现了两个基于yarn的application,一个是MapReduce,另一个是被当做如何写application的示例程序----Distributedshell,可以认为它就是YARN的workcount示例程序.
distributedshell作用和它名字一样,分布式shell执行,将用户提交的一串shell命令或者一个shell脚本,由ApplicationMaster控制,分配到不同的container中执行。
distributedshell的源代码在"hadoop-yarn-project\hadoop-yarn\hadoop-yarn-applications\hadoop-yarn-applications-distributedshell"
包含了实现一个application的三个要求:
客户端和RM (Client.java)
客户端提交application
AM和RM (ApplicationMaster.java)
注册AM,申请分配container
AM和NM (ApplicationMaster.java)
启动container
执行命令:
hadoop jar hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar -shell_command '/bin/date' -num_containers 10
启动10个container,每个都执行`date`命令
执行代码流程:
1. 客户端通过org.apache.hadoop.yarn.applications.distributedshell.Client提交application到RM,需提供ApplicationSubmissionContext
2. org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster提交containers请求,执行用户提交的命令ContainerLaunchContext.commands
客户端(Client.java):
1. YarnClient.getNewApplication
2. 填充ApplicationSubmissionContext,ContainerLaunchContext(启动AM的Container)
3. YarnClient.submitApplication
4. 每隔一段时间调用YarnClient.getApplicationReport获得Application Status
// 创建AM的上下文信息
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// 设置本地资源,AppMaster.jar包,log4j.properties
amContainer.setLocalResources(localResources);
// 环境变量,shell脚本在hdfs的地址, CLASSPATH
amContainer.setEnvironment(env);
// 设置启动AM的命令和参数
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
vargs.add("${JAVA_HOME}" + "/bin/java");
vargs.add("-Xmx" + amMemory + "m");
// AM主类
vargs.add("org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster?");
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
vargs.add("--shell_command " + shellCommand + "");
}
if (!shellArgs.isEmpty()) {
vargs.add("--shell_args " + shellArgs + "");
}
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
amContainer.setCommands(commands);
// 设置Resource需求,目前只设置memory
capability.setMemory(amMemory);
amContainer.setResource(capability);
appContext.setAMContainerSpec(amContainer);
// 提交application到RM
super.submitApplication(appContext);
ApplicationMaster(ApplicationMaster.java)
1. AMRMClient.registerApplicationMaster
2. 提供ContainerRequest到AMRMClient.addContainerRequest
3. 通过AMRMClient.allocate获得container
4. container放入新建的LaunchContainerRunnable线程内执行
5. 创建ContainerLaunchContext,设置localResource,shellcommand, shellArgs等container启动信息
6. ContainerManager.startContainer(startReq)
7. 下次RPC call后得到的Response信息,AMResponse.getCompletedContainersStatuses
8. AMRMClient.unregisterApplicationMaster
// 新建AMRMClient,2.1beta版本实现了异步AMRMClient,这里还是同步的方式
resourceManager = new AMRMClientImpl(appAttemptID);
resourceManager.init(conf);
resourceManager.start();
// 向RM注册自己
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
while (numCompletedContainers.get() < numTotalContainers && !appDone) {
// 封装Container请求,设置Resource需求,这边只设置了memory
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
resourceManager.addContainerRequest(containerAsk);
// Send the request to RM
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
AMResponse amResp = sendContainerAskToRM();
// Retrieve list of allocated containers from the response
List<Container> allocatedContainers = amResp.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
//新建一个线程来提交container启动请求,这样主线程就不会被block住了
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
allocatedContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
launchThreads.add(launchThread);
launchThread.start();
}
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
}
// 向RM注销自己
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
附上AM的log信息
containerNode=dev81.hadoop:56100, containerNodeURI=dev81.hadoop:8042, containerStateNEW, containerResourceMemory1024
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Current available resources in the cluster <memory:26624, vCores:-5>
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got response from RM for container ask, completedCnt=5
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000007, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000007
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000008, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Connecting to ContainerManager at dev81.hadoop:56100
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000008
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000009, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000009
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000006, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Setting up container launch container for containerid=container_1376966186147_0006_01_000011
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000006
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Setting user in ContainerLaunchContext to: hadoop
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000010, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000010
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Current application state: loop=3, appDone=false, total=10, requested=10, completed=9, failed=0, currentAllocated=10
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Current application state: loop=4, appDone=false, total=10, requested=10, completed=9, failed=0, currentAllocated=10
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Asking RM for containers, askCount=0
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Sending request to RM for containers, progress=0.9
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Got response from RM for container ask, allocatedCnt=0
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Current available resources in the cluster <memory:26624, vCores:-5>
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Got response from RM for container ask, completedCnt=1
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000011, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000011
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Current application state: loop=4, appDone=true, total=10, requested=10, completed=10, failed=0, currentAllocated=10
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Application completed. Signalling finish to RM
13/08/26 17:15:10 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.AMRMClientImpl is stopped.
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Application Master completed successfully. exiting
参考例子: