从源码看Yarn上的MapReduce(一)

本系列并不过多涉及Yarn的相关源码,重点在于,分析MapReduce这个计算模型,到底是如何跑起来的,无论是在Yarn上,还是在MR1上。

本文有些内容与关于Yarn源码系列有交汇,两相对照,学习更深入。

本文基于2.6.5的Hadoop源码:

我们从头来看,比如说我提交了一个简单的Job程序,其中有Main方法

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> [<in>...] <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		for (int i = 0; i < otherArgs.length - 1; ++i) {
			FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
		}
		FileOutputFormat.setOutputPath(job, new Path(
				otherArgs[otherArgs.length - 1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	

除了第一句新建一个Configuration,再加几句参数判断,余下的代码几乎全跟Job有关系,看起来Job这个类非常重要,我们选择性看下其中的代码:

static {
		ConfigUtil.loadResources();
	}

静态初始化的资源加载,很重要:

public static void loadResources() {
		addDeprecatedKeys();
		Configuration.addDefaultResource("mapred-default.xml");
		Configuration.addDefaultResource("mapred-site.xml");
		Configuration.addDefaultResource("yarn-default.xml");
		Configuration.addDefaultResource("yarn-site.xml");
	}

第一句是保留的,为了与前面的代码兼容,会加载一些已经过期不使用的配置,不说了。

接着,要加载四个配置文件,而且这四个还不过是默认的配置文件:

/**
	 * Add a default resource. Resources are loaded in the order of the
	 * resources added.
	 * 
	 * @param name
	 *            file name. File should be present in the classpath.
	 */
	public static synchronized void addDefaultResource(String name) {
		if (!defaultResources.contains(name)) {
			defaultResources.add(name);
			for (Configuration conf : REGISTRY.keySet()) {
				if (conf.loadDefaults) {
					conf.reloadConfiguration();
				}
			}
		}
	}

静态同步方法,而这里就需要注意了,类的初始化,是先进行static模块,而后才会进行构造器加载的,也就是说,而且默认loadFaults是true:

所以,系统最最开始就要加载这四个配置文件,而defaultResource是一个CopyOnWriteArrayList。

我们看看reloadConfiguration方法:

public synchronized void reloadConfiguration() {
		properties = null; // trigger reload
		finalParameters.clear(); // clear site-limits
	}

这个重加载的方法,首先就把所有加载过的参数清除地一干二净:

public Job(Configuration conf, String jobName) throws IOException {
		this(conf);
		setJobName(jobName);
	}

我们调用的初始化实际是这个方法,重点看第一个:

public Job(Configuration conf) throws IOException {
		this(new JobConf(conf));
	}

我们把这个方法拆开看看:

public JobConf(Class exampleClass) {
		setJarByClass(exampleClass);
		checkAndWarnDeprecation();
	}
public void setJarByClass(Class cls) {
		String jar = ClassUtil.findContainingJar(cls);
		if (jar != null) {
			setJar(jar);
		}
	}

就我们传入的conf来说,是不包含jar包的,所以这个方法略过,至于第二个,则是一些检查,略过不提。

外面还有一层,点开看看:

Job(JobConf conf) throws IOException {
		super(conf, null);
		// propagate existing user credentials to job
		this.credentials.mergeAll(this.ugi.getCredentials());
		this.cluster = null;
	}

毫无疑问,要看看super中的构造器:

public JobContextImpl(Configuration conf, JobID jobId) {
		if (conf instanceof JobConf) {
			this.conf = (JobConf) conf;
		} else {
			this.conf = new JobConf(conf);
		}
		this.jobId = jobId;
		this.credentials = this.conf.getCredentials();
		try {
			this.ugi = UserGroupInformation.getCurrentUser();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
原来是在这儿,我们传入的conf已经是JobConf了:

好了,我们现在有个Job了,接下来五句话,给Job添加了很多的成员变量,掠过不提。

最后一倒数第二句实质上,也是给Job添加成员变量,这都不说,我们直接看waitForCompletion方法:

public boolean waitForCompletion(boolean verbose) throws IOException,
			InterruptedException, ClassNotFoundException {
		if (state == JobState.DEFINE) {
			submit();
		}
	}

默认情况下state==JobState.DEFINE,执行submit方法:

public void submit() throws IOException, InterruptedException,
			ClassNotFoundException {
		ensureState(JobState.DEFINE);
		setUseNewAPI();
		connect();
		final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),
				cluster.getClient());
		status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
			public JobStatus run() throws IOException, InterruptedException,
					ClassNotFoundException {
				return submitter.submitJobInternal(Job.this, cluster);
			}
		});
		state = JobState.RUNNING;
		LOG.info("The url to track the job: " + getTrackingURL());
	}

一点点看:

private void ensureState(JobState state) throws IllegalStateException {
		if (state != this.state) {
			throw new IllegalStateException("Job in state " + this.state
					+ " instead of " + state);
		}

		if (state == JobState.RUNNING && cluster == null) {
			throw new IllegalStateException("Job in state " + this.state
					+ ", but it isn't attached to any job tracker!");
		}
	}

这没什么可说的,多加了几次验证,我们接着看setUseNewAPI:

int numReduces = conf.getNumReduceTasks();
public int getNumReduceTasks() {
		return getInt(JobContext.NUM_REDUCES, 1);
	}

我们发现,代码刚一开始运行的时候,就需要先确定Num_reduces的数量,这个很重要,而默认情况下,ReduceTask的数量是1,这方便我们最终结果的汇总。

		setUseNewAPI();

该方法内逻辑看起来复杂些,但都是为了确认使用新的API,我们这里默认是使用的,所以不分析。

		connect();

看这个方法,虽然简短,但非常非常重要:

private synchronized void connect() throws IOException,
			InterruptedException, ClassNotFoundException {
		if (cluster == null) {
			cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
				public Cluster run() throws IOException, InterruptedException,
						ClassNotFoundException {
					return new Cluster(getConfiguration());
				}
			});
		}
	}

前文,ugi已经初始化过了:

/**
 * User and group information for Hadoop. This class wraps around a JAAS Subject
 * and provides methods to determine the user's username and groups. It supports
 * both the Windows, Unix and Kerberos login modules.
 */
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "HBase", "Hive",
		"Oozie" })
@InterfaceStability.Evolving
public class UserGroupInformation

注释很清楚,我们对此不分析,主要是记录Hadoop系统使用的用户和组信息等。

我们注意看下Cluster的构造器,这里getConfiguration实际上就是传入的JobConf,不多说了:

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
			throws IOException {
		this.conf = conf;
		this.ugi = UserGroupInformation.getCurrentUser();
		initialize(jobTrackAddr, conf);
	}

我们注意看第三个方法:

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
			throws IOException {
		synchronized (frameworkLoader) {
			for (ClientProtocolProvider provider : frameworkLoader) {
				LOG.debug("Trying ClientProtocolProvider : "
						+ provider.getClass().getName());
				ClientProtocol clientProtocol = null;
				try {
					if (jobTrackAddr == null) {
						clientProtocol = provider.create(conf);
					} else {
						clientProtocol = provider.create(jobTrackAddr, conf);
					}

					if (clientProtocol != null) {
						clientProtocolProvider = provider;
						client = clientProtocol;
						LOG.debug("Picked " + provider.getClass().getName()
								+ " as the ClientProtocolProvider");
						break;
					} else {
						LOG.debug("Cannot pick "
								+ provider.getClass().getName()
								+ " as the ClientProtocolProvider - returned null protocol");
					}
				} catch (Exception e) {
					LOG.info("Failed to use " + provider.getClass().getName()
							+ " due to error: " + e.getMessage());
				}
			}
		}

		if (null == clientProtocolProvider || null == client) {
			throw new IOException(
					"Cannot initialize Cluster. Please check your configuration for "
							+ MRConfig.FRAMEWORK_NAME
							+ " and the correspond server addresses.");
		}
	}

这里面的frameworkLoader是个静态成员变量:

private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader
			.load(ClientProtocolProvider.class);

内部方法我们不细看了,其实质就是找到当前类加载器中,所有继承了ClientProtocolProvider接口的类,而根据方法,我们第一次提交的jobTrackerAddr是Null,需要执行create方法:

			clientProtocol = provider.create(conf);
public ClientProtocol create(Configuration conf) throws IOException {
		String framework = conf.get(MRConfig.FRAMEWORK_NAME,
				MRConfig.LOCAL_FRAMEWORK_NAME);
		if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
			return null;
		}
		conf.setInt(JobContext.NUM_MAPS, 1);
		return new LocalJobRunner(conf);
	}

方法的含义在于,如果我们指定了mapreduce.framework.name为yarn,则调用的clientProtocol是YarnRunner,所以,想要使用Yarn,我们必须注意这个配置:

mapreduce.framework.name=yarn

否则就是本地提交了。

毫无疑问,我们必须看看这个YarnRunner的初始化了:

public YARNRunner(Configuration conf) {
		this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
	}
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
			ClientCache clientCache) {
		this.conf = conf;
		try {
			this.resMgrDelegate = resMgrDelegate;
			this.clientCache = clientCache;
			this.defaultFileContext = FileContext.getFileContext(this.conf);
		} catch (UnsupportedFileSystemException ufe) {
			throw new RuntimeException("Error in instantiating YarnClient", ufe);
		}
	}

注意,这里出现了强转,把原先传入的JobConf强转成了YarnConfiguration,至于YarnConfiguration的内容我们不看了,里面定义了很多Yarn自己的专属配置。

public ResourceMgrDelegate(YarnConfiguration conf) {
		super(ResourceMgrDelegate.class.getName());
		this.conf = conf;
		this.client = YarnClient.createYarnClient();
		init(conf);
		start();
	}

我们注意看下这个构造方法,其实质就是和RM形成了一个RPC的连接,并将这个连接拿出去给YarnRunner来用,既然牵涉到RPC了,那我们就看下,因为其中牵涉到了我们常用的一些配置的地址:

@Public
	public static YarnClient createYarnClient() {
		YarnClient client = new YarnClientImpl();
		return client;
	}

我们可以看到这里的JobClient及YarnClient,实际上是YarnClientImpl,我们仔细看看其代码,其初始化并不难,但是其中重要的是serviceInit和serviceStart方法,我们要养成一种看Yarn源码的习惯,那就是所有的服务类内,都有这两个方法,统一地进行初始化和服务启动:

这里,serviceInit的方法不说了,我们看serviceStart方法,其比较关键:

rmClient = ClientRMProxy.createRMProxy(getConfig(),
					ApplicationClientProtocol.class);

ApplicationClientProtocol是我们Application与RM打交道的通信协议,而这个协议是建立在RPC基础之上的,而这里,就是RPC建立的紧要之处,我们要看看这段代码,看看到底是如何定义了RM的地址以及其他的一些配置:

代码里的getConfig方法不说了,源自于AbstractService中的方法,获取当前配置:

public static <T> T createRMProxy(final Configuration configuration,
			final Class<T> protocol) throws IOException {
		return createRMProxy(configuration, protocol, INSTANCE);
	}

注意里面的INSTANCE,这个东西,实际上是静态初始化的:

	private static final ClientRMProxy INSTANCE = new ClientRMProxy();

接着看:

/**
	 * Create a proxy for the specified protocol. For non-HA, this is a direct
	 * connection to the ResourceManager address. When HA is enabled, the proxy
	 * handles the failover between the ResourceManagers as well.
	 */
	@Private
	protected static <T> T createRMProxy(final Configuration configuration,
			final Class<T> protocol, RMProxy instance) throws IOException {
		YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration
				: new YarnConfiguration(configuration);
		RetryPolicy retryPolicy = createRetryPolicy(conf);
		if (HAUtil.isHAEnabled(conf)) {
			RMFailoverProxyProvider<T> provider = instance
					.createRMFailoverProxyProvider(conf, protocol);
			return (T) RetryProxy.create(protocol, provider, retryPolicy);
		} else {
			InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
			LOG.info("Connecting to ResourceManager at " + rmAddress);
			T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress);
			return (T) RetryProxy.create(protocol, proxy, retryPolicy);
		}
	}

这段代码很关键,我们的conf已经是YarnConfiguration了,不多说,这里,我们默认讨论非RM高可用的状况,直接进入else的代码:

protected InetSocketAddress getRMAddress(YarnConfiguration conf,
			Class<?> protocol) throws IOException {
		if (protocol == ApplicationClientProtocol.class) {
			return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
					YarnConfiguration.DEFAULT_RM_ADDRESS,
					YarnConfiguration.DEFAULT_RM_PORT);
		}
	}

这下看到了吧,我们的配置实际上是在这里得到使用的,使用的YarnConfiguration里相关的配置,来达到我们建立RPC连接的目的:

	public static final String RM_PREFIX = "yarn.resourcemanager.";
        public static final String RM_ADDRESS = RM_PREFIX + "address";
	public static final int DEFAULT_RM_PORT = 8032;
	public static final String DEFAULT_RM_ADDRESS = "0.0.0.0:"
			+ DEFAULT_RM_PORT;

所以,默认情况下,我们会连接RM的8032端口,来建立RPC的连接,而如果我们不配置RM_ADDRESS的话,那就只能在RM上来提交任务了,因为0.0.0.0代表当前设备的IP:

到了这里,一个与RM沟通的RPC连接建立起来了:而实际上,持有这个连接的rmClient是YarnClientImpl的成员变量,而这个YarnClientImpl则是ResourceMgrDelegate的内部成员:

而实际上,这个ResourceMgrDelegate则是YARNRunner的内部成员,所以我们明白了:YarnRunner与RM的大多交互,都是通过持有的RPC连接来建立的。

接着继续看submitJobInternal的方法:

final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),
				cluster.getClient());
/** Only for mocking via unit tests. */
	@Private
	public JobSubmitter getJobSubmitter(FileSystem fs,
			ClientProtocol submitClient) throws IOException {
		return new JobSubmitter(fs, submitClient);
	}

前文新建Cluster的时候,已经内置了一个与RM连接的RPC连接,即client,而这里,JobSubmitter内部持有client,两个操作实际上都是通过client来实现的:

接下来,我们认真看其中的逻辑:

		checkSpecs(job);

输出目录检查:

private void checkSpecs(Job job) throws ClassNotFoundException,
			InterruptedException, IOException {
		JobConf jConf = (JobConf) job.getConfiguration();
		// Check the output specification
		if (jConf.getNumReduceTasks() == 0 ? jConf.getUseNewMapper() : jConf
				.getUseNewReducer()) {
			org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
					.newInstance(job.getOutputFormatClass(),
							job.getConfiguration());
			output.checkOutputSpecs(job);
		} else {
			jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
		}
	}

这段逻辑非常简单,具体不分析了,主要是用于判断输出目录是否存在,从方法名就可以看出:

我们看看第二句代码,比较复杂:

		Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

调用静态方法,找到本次Job提交的工作目录:

/**
	 * Initializes the staging directory and returns the path. It also keeps
	 * track of all necessary ownership & permissions
	 * 
	 * @param cluster
	 * @param conf
	 */
	public static Path getStagingDir(Cluster cluster, Configuration conf)
public String getStagingAreaDir() throws IOException, InterruptedException {
		// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
		String user = UserGroupInformation.getCurrentUser().getShortUserName();
		Path path = MRApps.getStagingAreaDir(conf, user);
		LOG.debug("getStagingAreaDir: dir=" + path);
		return path.toString();
	}

这个是为数不多的YarnRunner调用的不需要与RM交互的操作,主要是在本地创建本次Job的工作目录。

还是多说一句,对checkSpecs方法做一定的介绍:

private void checkSpecs(Job job) throws ClassNotFoundException,
			InterruptedException, IOException {
		JobConf jConf = (JobConf) job.getConfiguration();
		// Check the output specification
		if (jConf.getNumReduceTasks() == 0 ? jConf.getUseNewMapper() : jConf
				.getUseNewReducer()) {
			org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
					.newInstance(job.getOutputFormatClass(),
							job.getConfiguration());
			output.checkOutputSpecs(job);
		} else {
			jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
		}
	}

首先,反射方式获取到我们的OutputFormat的类,checkOutputSpecs方法则是在FileOutputFormat内:

public void checkOutputSpecs(JobContext job)
			throws FileAlreadyExistsException, IOException {
		// Ensure that the output directory is set and not already there
		Path outDir = getOutputPath(job);
		if (outDir == null) {
			throw new InvalidJobConfException("Output directory not set.");
		}

		// get delegation token for outDir's file system
		TokenCache.obtainTokensForNamenodes(job.getCredentials(),
				new Path[] { outDir }, job.getConfiguration());

		if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
			throw new FileAlreadyExistsException("Output directory " + outDir
					+ " already exists");
		}
	}

这里,假设我们提交的输出路径是:hdfs://host:ip/output,然后我们把重点放在最后一句判断上:

outDir.getFileSystem(job.getConfiguration()).exists(outDir

首先,getFileSystem:

/** Return the FileSystem that owns this Path. */
	public FileSystem getFileSystem(Configuration conf) throws IOException {
		return FileSystem.get(this.toUri(), conf);
	}

这是定义在Path内的方法,会根据Path的类型来寻找对应的文件系统;这里用到了FileSystem的静态方法,而且这里面,toUri方法是为了获取URI,而实际上,就可以理解为path对应的路径,带有hdfs协议的路径:

有这么一句,我们看看:

if (scheme != null && authority == null) { // no authority
			URI defaultUri = getDefaultUri(conf);
			if (scheme.equals(defaultUri.getScheme()) // if scheme matches
														// default
					&& defaultUri.getAuthority() != null) { // & default has
															// authority
				return get(defaultUri, conf); // return default
			}
		}

获取默认的文件系统URI:

/**
	 * Get the default filesystem URI from a configuration.
	 * 
	 * @param conf
	 *            the configuration to use
	 * @return the uri of the default filesystem
	 */
	public static URI getDefaultUri(Configuration conf) {
		return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
	}
public static final String  FS_DEFAULT_NAME_KEY = "fs.defaultFS";
public static final String  FS_DEFAULT_NAME_DEFAULT = "file:///";

所以,我们又看到了一个常用的配置,默认需要从这个值来读取对应的文件系统,默认情况下是本地文件系统:

if (conf.getBoolean(disableCacheName, false)) {
			return createFileSystem(uri, conf);
		}

createFileSystem不看了,就是创建一个文件系统,在我们的配置下,是hdfs文件系统,内部调用了DistributedFileSystem的initialize方法:

@Override
	public void initialize(URI uri, Configuration conf) throws IOException {
		super.initialize(uri, conf);
		setConf(conf);

		String host = uri.getHost();
		if (host == null) {
			throw new IOException("Incomplete HDFS URI, no host: " + uri);
		}
		homeDirPrefix = conf.get(DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
				DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);

		this.dfs = new DFSClient(uri, conf, statistics);
		this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
		this.workingDir = getHomeDirectory();
	}

我们注意到,这里建立了DFSClient,具体内容不说了,看下注释即可:

/********************************************************
 * DFSClient can connect to a Hadoop Filesystem and 
 * perform basic file tasks.  It uses the ClientProtocol
 * to communicate with a NameNode daemon, and connects 
 * directly to DataNodes to read/write block data.
 *
 * Hadoop DFS users should obtain an instance of 
 * DistributedFileSystem, which uses DFSClient to handle
 * filesystem tasks.
 *
 ********************************************************/
@InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    DataEncryptionKeyFactory 

程序到这儿,我们看看这个注释,说明DFSClient需要完成于NN的交互,同时需要接受NN的命令,完成于DN的交互,在本文分析中,主要是要把本地文件目录复制到HDFS集群中。

余下有些代码我们直接略过,看这段;

/**
	 * configure the jobconf of the user with the command line options of
	 * -libjars, -files, -archives.
	 * 
	 * @param job
	 * @throws IOException
	 */
	private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
			throws IOException {
		JobResourceUploader rUploader = new JobResourceUploader(jtFs);
		rUploader.uploadFiles(job, jobSubmitDir);

		// Set the working directory
		if (job.getWorkingDirectory() == null) {
			job.setWorkingDirectory(jtFs.getWorkingDirectory());
		}
	}

实际上,在这个方法执行前,我们的工作目录还依旧是在本地,这里通过JobResourceUploader,把相关文件上传到了HDFS上,具体方法可参见相关代码,内容很多,我们选择部分来看:

short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION,
				Job.DEFAULT_SUBMIT_REPLICATION);
	public static final int DEFAULT_SUBMIT_REPLICATION = 10;

默认情况下,会上传十个副本,以尽可能避免任务启动时候齐刷刷下载文件的热点情况:

String files = conf.get("tmpfiles");
		String libjars = conf.get("tmpjars");
		String archives = conf.get("tmparchives");
		String jobJar = job.getJar();

这里,目录内容分为四种情况:临时文件,第三方jar包,临时文档,以及提交的任务的jar包:

			int maps = writeSplits(job, submitJobDir);

我们看看这句话,了解一下我们任务的分片到底是如何设计出来的:

@SuppressWarnings("unchecked")
	private <T extends InputSplit> int writeNewSplits(JobContext job,
			Path jobSubmitDir) throws IOException, InterruptedException,
			ClassNotFoundException {
		Configuration conf = job.getConfiguration();
		InputFormat<?, ?> input = ReflectionUtils.newInstance(
				job.getInputFormatClass(), conf);

		List<InputSplit> splits = input.getSplits(job);
		T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

		// sort the splits into order based on size, so that the biggest
		// go first
		Arrays.sort(array, new SplitComparator());
		JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
				jobSubmitDir.getFileSystem(conf), array);
		return array.length;
	}

这里,因为我们使用了newApi,进入了如上方法,这里,我们不会对与FS牵涉过重的逻辑进行分析,只要知道其大致是以Block的大小来为基准区分文件的即可:

status = submitClient.submitJob(jobId, submitJobDir.toString(),
					job.getCredentials());
最后的最后,我们把这个任务提交给了RM,jobId是第一次提交时候返回的ID,submitJobDir则是相关文件已经复制到服务器上的目录,credentials是各种授权信息。

最后明确一下这个整体过程:

  1. 整体过程中,我们把与Hadoop集群建立了两个连接,一个是YarnRunner持有的RPC连接,与RM交互;一个是DFSClient持有的流式接口,完成和NameNode的文件交互。
  2. 在正式提交ApplicationMaster之前,会先向集群申请一个ApplicationId,这是全局唯一的,而后我们会把运行程序所需要的本地文件,通过DFSClient持有的RPC连接与NN交互,然后通过文件流的形式,把文件传到HDFS集群中,因为文件不会很小,通过RPC连接比较麻烦

猜你喜欢

转载自blog.csdn.net/u013384984/article/details/80341907