1. 引言
hadoop 0.23系列后,就对Map/Reduce进行了完全重写,把原有的MR计算框架替代成了新一代Yarn计算框架- Yarn。Yarn有一点类似于Mesos(它们之间的对比请看如下链接How does YARN compare to Mesos?),运行在hdfs之上,用JAVA写的计算资源管理框架。而Map/Reduce被看成是一个成熟的应用框架运行在Yarn计算框架之上。我们可以在Yarn上开发各种各样的计算应用。业界也开始着手在Yarn上开发自己相应的计算应用。例如:Tez, Hama,storm, kitten等,详情可以参考董西成大哥的一篇文章<汇总运行在Hadoop YARN上的开源系统>。
其实我们了解Yarn的工作原理,并且熟悉其工作流程和运行机制。我们自己也可以实现相应的hadoop计算应用。在hadoop的官方文章里也写了一个thread,来介绍如何写一个yarn应用: Hadoop MapReduce Next Generation - Writing YARN Applications。这个thread主要简单讲了一下如何写一个简单的client, container和appmaster并且与其它节点(RM, NM)进行通信,完成所需任务。
Hama是一个hadoop生态系统中的图计算和迭代计算框架,有一点类似于google的号称是新三架马车之一的Pregel。在Pregel论文发表之时,google就有近30%的计算采用Pregel计算框架来完成。它们两者的主要思想就是实现一个BSP模型。这类模型适合大规模迭代的图计算,这在我之前的一篇博文中提到Hama Kmeans Clustering学习。
hama本来就可以在独立地运行如果我们称传统的hadoop版本是包含hdfs和mapreduce的话,那么可以称hama是hdfs和bsp的组合。它同时也实现了yarn框架,能够使bsp应用运行在yarn之上。
据说,Hama是除了map/reduce应用之外,第一个在yarn之上实现自己的应用。我们将以它为例子来介绍如何写出自己个性化的yarn应用,因为其比hadoop自带的MR2 yarn实现更加简单,更加容易学习。
2. hama yarn应用
正因为hama既能够运行在传统的hadoop系统之上,也能够运行的yarn之上,因此它在写yarn应用实现时,其序列/反序列化机制既用了那种传统的Writable,也有protobuf这类工具。client与appmaster是用Writable机制来获取超步superStep信息,而client与RM通信是用后者提交任务或查看任务的状态信息。
2.1. 用户创建Job
如果hama做一个单机上面执行,那么只需要用构造BSPJob,进而提交任务。如果在yarn中执行,就只需要把BSPJob换成YarnBSPJob即可以在yarn上运行该任务。如下是构造BSP任务代码。
/** * Creates a basic job with sequencefiles as in and output. */ public static BSPJob createJob(Configuration cnf, Path in, Path out, boolean textOut) throws IOException { HamaConfiguration conf = new HamaConfiguration(cnf); BSPJob job = new BSPJob(conf, KMeansBSP.class); //用于yarn框架下,请把上面一行注释掉,再把下一行取消注释 //BSPJob job = new YarnBSPJob(conf, KMeansBSP.class); //此处初始化一堆job参数, 省略... return job; }
2.2. YARNBSPJob
要使BSP应用在Yarn上运行,需要有client向resourcemanager提交相应任务。YarnBSPJob就是做类似的工作。YarnBSPJob建立了与resourcemanager的代理applicationsManager,并且与之通信。
public class YARNBSPJob extends BSPJob { private static final Log LOG = LogFactory.getLog(YARNBSPJob.class); private static volatile int id = 0; private YARNBSPJobClient submitClient; // 通过submitClient.launchJob提交任务 private BSPClient client; // private boolean submitted; //看该job是否提交 private ApplicationReport report; // 应用的汇报信息 private ClientRMProtocol applicationsManager; // 与resourcemanager通信的代理对象 private YarnRPC rpc; // 创建一个与rm交互的客户端代理 public YARNBSPJob(HamaConfiguration conf) throws IOException { super(conf); submitClient = new YARNBSPJobClient(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf); this.rpc = YarnRPC.create(conf); //创建一个rm代理对象,准备与rm通信 InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); this.applicationsManager = ((ClientRMProtocol) rpc.getProxy( ClientRMProtocol.class, rmAddress, conf)); //与rm建立代理连接 } //省略... }
2.3. 提交应用launchJob
在初始化RM代理之后,对该任务的相关class,xml等文件进行打jar包操作,然后把这jar包上传到hdfs上面去,再向RM提交该Job。提交BSP任务需要首先向RM请求新的application资源。
GetNewApplicationRequest request = Records .newRecord(GetNewApplicationRequest.class); GetNewApplicationResponse response = job.getApplicationsManager() .getNewApplication(request); id = response.getApplicationId(); LOG.debug("Got new ApplicationId=" + id);
再先产生一个ContainerLaunchContext对象,填充一些环境参数(环境变量env map,job文件所在本地路径localResources,appmaster的container启动命令command,设置内存参数capability)之后,然后会构造ApplicationSubmissionContext对象(appid, appname, appmaster container上述配置)构造完再向RM提交应用。
// Create a new ApplicationSubmissionContext ApplicationSubmissionContext appContext = Records .newRecord(ApplicationSubmissionContext.class); // set the ApplicationId appContext.setApplicationId(this.id); // set the application name appContext.setApplicationName(job.getJobName()); // Create a new container launch context for the AM's container ContainerLaunchContext amContainer = Records .newRecord(ContainerLaunchContext.class); // Define the local resources required Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); // Lets assume the jar we need for our ApplicationMaster is available in // HDFS at a certain known path to us and we want to make it available to // the ApplicationMaster in the launched container //初始化localResources, 省略... // Set the local resources into the launch context amContainer.setLocalResources(localResources); // Set up the environment needed for the launch context Map<String, String> env = new HashMap<String, String>(); // Assuming our classes or jars are available as local resources in the // working directory from which the command will be run, we need to append // "." to the path. // By default, all the hadoop specific classpaths will already be available // in $CLASSPATH, so we should be careful not to overwrite it. String classPathEnv = "$CLASSPATH:./*:"; env.put("CLASSPATH", classPathEnv); amContainer.setEnvironment(env); // Construct the command to be executed on the launched container // 产生能够启动appmaster的命令 String command = "${JAVA_HOME}" + "/bin/java -cp " + classPathEnv + " " + BSPApplicationMaster.class.getCanonicalName() + " " + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; LOG.debug("Start command: " + command); amContainer.setCommands(Collections.singletonList(command)); Resource capability = Records.newRecord(Resource.class); // we have at least 3 threads, which comsumes 1mb each, for each bsptask and // a base usage of 100mb capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100)); LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!"); amContainer.setResource(capability); // Set the container launch content into the ApplicationSubmissionContext appContext.setAMContainerSpec(amContainer); // Create the request to send to the ApplicationsManager SubmitApplicationRequest appRequest = Records .newRecord(SubmitApplicationRequest.class); //封装提交请求 appRequest.setApplicationSubmissionContext(appContext); job.getApplicationsManager().submitApplication(appRequest);//提交application应用请求
当job向RM提交后,我们就可以向RM获得applicationMaster的运行状态。
GetApplicationReportRequest reportRequest = Records .newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(id); while (report == null || report.getHost().equals("N/A")) { GetApplicationReportResponse reportResponse = job .getApplicationsManager().getApplicationReport(reportRequest); report = reportResponse.getApplicationReport();//获得appmaster的运行状态,包括:运行机器,端口等。 try { Thread.sleep(1000L); } catch (InterruptedException e) { LOG.error( "Got interrupted while waiting for a response report from AM.", e); } }
2.4. Hadoop ResourceManager启动Job
客户端向ResourceManager提交请求后,RM最后通过ClientRMService直接调用RMapplication事件处理器来同步处理,而不是采用yarn提供的异步dispatcher来处理,这样的好处是客户端会立即知道提交的Job是不是初始化完成并启动成功。在这其中,ClientRMService实现了ClientRMProtocol服务,它会加载相关参数(yarn.resourcemanager.address)开启服务端口(默认端口:8040)来接收服务。
// 在submitApplication里面,直接调用RMAppManager.handle来同步处理请求 rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis()));但是ClientRMService在submitApplication时先创建一个RMAppImpl实例,然后通过AsyncDispatcher来异步处理RMAppEvent事件。
// Create RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, clientTokenStr, appStore, this.scheduler, this.masterService, submitTime); //省略... // All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START));ResourceManager是一个大服务,它包含了各种各样,形形色色的小服务,在设计模式里,采用的是一种典型的组合模式。它在初始化时会其rmDispatcher注册事件类型RMAppEventType, RMAppAttemptEventType等。
// Register event handler for RmAppEvents this.rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(this.rmContext)); // Register event handler for RmAppAttemptEvents this.rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(this.rmContext));因此RMAppEvent事件会被ApplicationEventDispatcher来消化处理,ApplicationEventDispatcher会通过RMAppImpl.handle来修改该RMAppImpl的状态机为appmaster container节点创建RMAppAttemptImpl。同样,RMAppAttemptImp也维护着一个自有的状态机,根据状态机的当前状态和迁移状态来执行相应的逻辑。当 RMAppAttemptImp进入allocated状态时,就会向某个NodeManager提交创建container 。 下面是org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch()向某个NM提交启动AMContainer请求。
private void launch() throws IOException { connect(); //application.getMasterContainer()给出的是已经分配到某NM上运行的container, //它里面有相应NM的nodeId。 ContainerId masterContainerID = application.getMasterContainer().getId(); ApplicationSubmissionContext applicationContext = application.getSubmissionContext(); LOG.info("Setting up container " + application.getMasterContainer() + " for AM " + application.getAppAttemptId()); //准备好启动appmaster container时所需要的参数 ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID); StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); request.setContainerLaunchContext(launchContext); containerMgrProxy.startContainer(request); //通过NM代理containerMgrProxy向某一个NM提交启动container的请求 LOG.info("Done launching container " + application.getMasterContainer() + " for AM " + application.getAppAttemptId()); }
2.5 Hadoop NodeManager启动Job
在NodeManager一端接收到startContainer请求,则会对每个请求产生两个带有状态机的对象:ContainerImpl和ApplicationImpl。//创建container状态机 Container container = new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics); ApplicationId applicationID = containerID.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerID, container) != null) { NMAuditLogger.logFailure(launchContext.getUser(), AuditConstants.START_CONTAINER, "ContainerManagerImpl", "Container already running on this node!", applicationID, containerID); throw RPCUtil.getRemoteException("Container " + containerID + " already is running on this node!!"); } //创建一个application Application application = new ApplicationImpl(dispatcher, this.aclsManager, launchContext.getUser(), applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); dispatcher.getEventHandler().handle( new ApplicationInitEvent(applicationID, container .getLaunchContext().getApplicationACLs())); } // TODO: Validate the request dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container));
最后通过org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher.handle(ContainersLauncherEvent)来创建一个新的container进程。
switch (event.getType()) { case LAUNCH_CONTAINER: Application app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); //准备好启动一个新的Container环境变量和相关参数 ContainerLaunch launch = new ContainerLaunch(getConfig(), dispatcher, exec, app, event.getContainer(), dirsHandler); running.put(containerId, new RunningContainer(containerLauncher.submit(launch), launch)); //通过containerLauncher.submit(launch)创建一个新container进程,这个是一个阻塞进程,直到子进程退出。 break; case CLEANUP_CONTAINER: //省略... }
3. Hama AppMaster实现
Hama的实现了一个appmaster是org.apache.hama.bsp.BSPApplicationMaster。它的构造函数如下:
private BSPApplicationMaster(String[] args) throws Exception { // 传进来的参数是jobconf的路径名 if (args.length != 1) { throw new IllegalArgumentException(); } this.jobFile = args[0]; this.localConf = new YarnConfiguration(); //加入用户提交 的参数 this.jobConf = getSubmitConfiguration(jobFile); //bsp的名称 this.applicationName = jobConf.get("bsp.job.name", "<no bsp job name defined>"); if (applicationName.isEmpty()) { this.applicationName = "<no bsp job name defined>"; } //获得appAttemptId this.appAttemptId = getApplicationAttemptId(); //产生一个yarnRPC,该RPC是用于yarn框架,在hama里主要用于产生一个RM代理用于与resourceManager进行通信。 this.yarnRPC = YarnRPC.create(localConf); this.clock = new SystemClock(); this.startTime = clock.getTime(); this.jobId = new BSPJobID(appAttemptId.toString(), 0); //appmaster开启一个端口,与客户端进行通信 this.hostname = BSPNetUtils.getCanonicalHostname(); this.clientPort = BSPNetUtils.getFreePort(12000); // start our synchronization service 启动BSP同步服务,该服务会通过zookeeper server来管理服务的同步, startSyncServer(); // 启动提供给客户端使用的BSPClient clientServer, 与其它bsp task container进行通信的BSPPeerProtocol taskServer,同时会设置hama.umbilical.address为hostname:taskServerPort startRPCServers(); /* * Make sure that this executes after the start the RPC servers, because we * are readjusting the configuration. */ //把上面重新改动的配置写到hdfs文件里面 rewriteSubmitConfiguration(jobFile, jobConf); //把在客户端分好的split list从hdfs上读出来 String jobSplit = jobConf.get("bsp.job.split.file"); splits = null; if (jobSplit != null) { DataInputStream splitFile = fs.open(new Path(jobSplit)); try { splits = BSPJobClient.readSplitFile(splitFile); } finally { splitFile.close(); } } //产生一个appmaster与resourceManager通信代理 this.amrmRPC = getYarnRPCConnection(localConf); registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, "http://localhost:8080"); }
startRPCServers()方法创建了两个RPC server,这些server中的其中一个是用来与提交任务的clientServer,接口类为BSPClient,它有一个getCurrentSuperStep()方法用于获得该job已经运行到哪一超步了;另外一个taskServer用于与所属这个job的任务节点进行通信,接口类是BSPPeerProtocol,其里面有两个重要的方法:getTask(用于获得该节点的任务信息,用于task节点初始化)和statusUpdate(更新task节点的状态)。
当初始化appmaster后,会向RM申请集群的资源信息,RM计算好该分配的资源信息后,会返回一堆NM列表, appmaster会在这些节点列表上启动其相应的BSP Task。
@Override public JobState startJob() throws Exception { //初始化列表,产生的数量是bsp.peers.num个 this.allocatedContainers = new ArrayList<Container>(numBSPTasks); while (allocatedContainers.size() < numBSPTasks) { //向resourceManager申请bsp task container节点的资源。createBSPTaskRequest方法是请求资源ResourceRequest列表;releasedContainers是该job已经不再使用,需要释放的containerId列表。 AllocateRequest req = BuilderUtils.newAllocateRequest( appAttemptId, lastResponseID, 0.0f, createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, priority), releasedContainers); //向RM申请资源,启动numBSPTasks个bsp container任务 AllocateResponse allocateResponse = resourceManager.allocate(req); AMResponse amResponse = allocateResponse.getAMResponse(); LOG.info("Got response! ID: " + amResponse.getResponseId() + " with num of containers: " + amResponse.getAllocatedContainers().size() + " and following resources: " + amResponse.getAvailableResources().getMemory() + "mb"); this.lastResponseID = amResponse.getResponseId(); // availableResources = amResponse.getAvailableResources(); this.allocatedContainers.addAll(amResponse.getAllocatedContainers()); LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); Thread.sleep(1000l); } //RM同时把numBSPTasks个container任务分配给了这个application。 LOG.info("Got " + allocatedContainers.size() + " containers!"); int id = 0; for (Container allocatedContainer : allocatedContainers) { LOG.info("Launching task on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerState" + allocatedContainer.getState() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); // Connect to ContainerManager on the allocated container //建立与container所属的NM进行通信。 String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort(); InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); //NM代理 ContainerManager cm = (ContainerManager) yarnRPC.getProxy( ContainerManager.class, cmAddress, conf); //构造一个BSPTaskLauncher用于启动container BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, allocatedContainer, cm, conf, jobFile, jobId); launchers.put(id, runnableLaunchContainer); //启动container。在这方法里调用setupContainer方法把JAVA命令行,环境变量,BSP相关参数都传给NM,调用NM的containerManager服务(org.apache.hadoop.yarn.api.ContainerManager.startContainer(StartContainerRequest))去启动一个子进程BSPRunner container,并且获得该container的状态信息。 runnableLaunchContainer.start(); completionQueue.add(runnableLaunchContainer); id++; } LOG.info("Waiting for tasks to finish..."); state = JobState.RUNNING; int completed = 0; while (completed != numBSPTasks) { for (BSPTaskLauncher task : completionQueue) { BSPTaskStatus returnedTask = task.poll(); //从每个runnableLaunchContainer中获得状态信息。 // if our task returned with a finished state if (returnedTask != null) { if (returnedTask.getExitStatus() != 0) { LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!"); state = JobState.FAILED; return state; } else { LOG.info("Task \"" + returnedTask.getId() + "\" sucessfully finished!"); completed++; LOG.info("Waiting for " + (numBSPTasks - completed) + " tasks to finish!"); } cleanupTask(returnedTask.getId()); } } Thread.sleep(1000L); } state = JobState.SUCCESS; return state; }4. Container启动流程 在Hama yarn框架的实现里,appmaster是一个特殊的container,它启动的方式跟普通的container一样,事实上其它的yarn实现也是通过这种流程。如下列出两类container进程的启动流程。
- 普通container:Appmaster先向RM申请资源,然后appmaster向构造java命令等环境参数去通知NM去fork一个子进程setup任务节点。
- appmaster container:client端先构造java命令等环境参数,然后去向RM申请appmaster container资源,RM通知NM fork一个子进程appmaster,启动这个appmaster的节点信自己会返回给RM和client。