YARN Distributedshell解析

Hadoop 2.0的源代码中实现了两个基于yarn的application,一个是MapReduce,另一个是被当做如何写application的示例程序----Distributedshell,可以认为它就是YARN的workcount示例程序.

distributedshell作用和它名字一样,分布式shell执行,将用户提交的一串shell命令或者一个shell脚本,由ApplicationMaster控制,分配到不同的container中执行。

distributedshell的源代码在"hadoop-yarn-project\hadoop-yarn\hadoop-yarn-applications\hadoop-yarn-applications-distributedshell"

包含了实现一个application的三个要求:

客户端和RM (Client.java)

客户端提交application

AM和RM (ApplicationMaster.java)
注册AM,申请分配container
AM和NM (ApplicationMaster.java)
启动container

执行命令:
hadoop jar hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar -shell_command '/bin/date' -num_containers 10
启动10个container,每个都执行`date`命令

执行代码流程:
1. 客户端通过org.apache.hadoop.yarn.applications.distributedshell.Client提交application到RM,需提供ApplicationSubmissionContext
2. org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster提交containers请求,执行用户提交的命令ContainerLaunchContext.commands

客户端(Client.java):
1. YarnClient.getNewApplication
2. 填充ApplicationSubmissionContext,ContainerLaunchContext(启动AM的Container)​
3. YarnClient.submitApplication​
4. 每隔一段时间调用YarnClient.getApplicationReport获得Application Status
	// 创建AM的上下文信息
	ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
	// 设置本地资源,AppMaster.jar包,log4j.properties
	amContainer.setLocalResources(localResources);
	// 环境变量,shell脚本在hdfs的地址, CLASSPATH
	amContainer.setEnvironment(env);
	// 设置启动AM的命令和参数
	Vector<CharSequence> vargs = new Vector<CharSequence>(30);
	vargs.add("${JAVA_HOME}" + "/bin/java");
	vargs.add("-Xmx" + amMemory + "m");
	// AM主类
	vargs.add("org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster?");
	vargs.add("--container_memory " + String.valueOf(containerMemory));
	vargs.add("--num_containers " + String.valueOf(numContainers));
	vargs.add("--priority " + String.valueOf(shellCmdPriority));
	if (!shellCommand.isEmpty()) {
	vargs.add("--shell_command " + shellCommand + "");
	}
	if (!shellArgs.isEmpty()) {
	vargs.add("--shell_args " + shellArgs + "");
	}
	for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
	vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
	}
	vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
	vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

	amContainer.setCommands(commands);
	// 设置Resource需求,目前只设置memory
	capability.setMemory(amMemory);
	amContainer.setResource(capability);
	appContext.setAMContainerSpec(amContainer);
	// 提交application到RM
	super.submitApplication(appContext);

ApplicationMaster(ApplicationMaster.java​)
1. AMRMClient.registerApplicationMaster​​
2. 提供ContainerRequest到AMRMClient.addContainerRequest​
3. 通过AMRMClient.allocate获得container
4. container放入新建的LaunchContainerRunnable线程内执行
5. 创建ContainerLaunchContext​,设置localResource,shellcommand, shellArgs等​​container启动信息
6. ContainerManager.startContainer(startReq)​​
7. 下次RPC call后得到的Response信息,AMResponse.getCompletedContainersStatuses​​
8. AMRMClient.unregisterApplicationMaster​​
	// 新建AMRMClient,2.1beta版本实现了异步AMRMClient,这里还是同步的方式
	resourceManager = new AMRMClientImpl(appAttemptID);
	resourceManager.init(conf);
	resourceManager.start();
	// 向RM注册自己
	RegisterApplicationMasterResponse response = resourceManager
	  .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
		  appMasterTrackingUrl);
	while (numCompletedContainers.get() < numTotalContainers && !appDone) {
	// 封装Container请求,设置Resource需求,这边只设置了memory
	ContainerRequest containerAsk = setupContainerAskForRM(askCount);
	resourceManager.addContainerRequest(containerAsk);

	// Send the request to RM
	LOG.info("Asking RM for containers" + ", askCount=" + askCount);
	AMResponse amResp = sendContainerAskToRM();

	// Retrieve list of allocated containers from the response
	List<Container> allocatedContainers = amResp.getAllocatedContainers();
	for (Container allocatedContainer : allocatedContainers) {
		//新建一个线程来提交container启动请求,这样主线程就不会被block住了
		LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
		  allocatedContainer);
		Thread launchThread = new Thread(runnableLaunchContainer);
		launchThreads.add(launchThread);
		launchThread.start();
	}
	List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
	}
	// 向RM注销自己
	resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);

附上AM的log信息

containerNode=dev81.hadoop:56100, containerNodeURI=dev81.hadoop:8042, containerStateNEW, containerResourceMemory1024
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Current available resources in the cluster <memory:26624, vCores:-5>
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got response from RM for container ask, completedCnt=5
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000007, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000007
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000008, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Connecting to ContainerManager at dev81.hadoop:56100
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000008
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000009, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000009
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000006, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Setting up container launch container for containerid=container_1376966186147_0006_01_000011
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000006
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Setting user in ContainerLaunchContext to: hadoop
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000010, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000010
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Current application state: loop=3, appDone=false, total=10, requested=10, completed=9, failed=0, currentAllocated=10
13/08/26 17:15:09 INFO distributedshell.ApplicationMaster: Current application state: loop=4, appDone=false, total=10, requested=10, completed=9, failed=0, currentAllocated=10
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Asking RM for containers, askCount=0
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Sending request to RM for containers, progress=0.9
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Got response from RM for container ask, allocatedCnt=0
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Current available resources in the cluster <memory:26624, vCores:-5>
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Got response from RM for container ask, completedCnt=1
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Got container status for containerID=container_1376966186147_0006_01_000011, state=COMPLETE, exitStatus=0, diagnostics=
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Container completed successfully., containerId=container_1376966186147_0006_01_000011
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Current application state: loop=4, appDone=true, total=10, requested=10, completed=10, failed=0, currentAllocated=10
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Application completed. Signalling finish to RM
13/08/26 17:15:10 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.AMRMClientImpl is stopped.
13/08/26 17:15:10 INFO distributedshell.ApplicationMaster: Application Master completed successfully. exiting


参考例子:


本文链接http://blog.csdn.net/lalaguozhe/article/details/10361367 ,转载请注明


猜你喜欢

转载自blog.csdn.net/lalaguozhe/article/details/10361367