本系列并不过多涉及Yarn的相关源码,重点在于,分析MapReduce这个计算模型,到底是如何跑起来的,无论是在Yarn上,还是在MR1上。
本文有些内容与关于Yarn源码系列有交汇,两相对照,学习更深入。
本文基于2.6.5的Hadoop源码:
我们从头来看,比如说我提交了一个简单的Job程序,其中有Main方法
Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
除了第一句新建一个Configuration,再加几句参数判断,余下的代码几乎全跟Job有关系,看起来Job这个类非常重要,我们选择性看下其中的代码:
static { ConfigUtil.loadResources(); }
静态初始化的资源加载,很重要:
public static void loadResources() { addDeprecatedKeys(); Configuration.addDefaultResource("mapred-default.xml"); Configuration.addDefaultResource("mapred-site.xml"); Configuration.addDefaultResource("yarn-default.xml"); Configuration.addDefaultResource("yarn-site.xml"); }
第一句是保留的,为了与前面的代码兼容,会加载一些已经过期不使用的配置,不说了。
接着,要加载四个配置文件,而且这四个还不过是默认的配置文件:
/** * Add a default resource. Resources are loaded in the order of the * resources added. * * @param name * file name. File should be present in the classpath. */ public static synchronized void addDefaultResource(String name) { if (!defaultResources.contains(name)) { defaultResources.add(name); for (Configuration conf : REGISTRY.keySet()) { if (conf.loadDefaults) { conf.reloadConfiguration(); } } } }
静态同步方法,而这里就需要注意了,类的初始化,是先进行static模块,而后才会进行构造器加载的,也就是说,而且默认loadFaults是true:
所以,系统最最开始就要加载这四个配置文件,而defaultResource是一个CopyOnWriteArrayList。
我们看看reloadConfiguration方法:
public synchronized void reloadConfiguration() { properties = null; // trigger reload finalParameters.clear(); // clear site-limits }
这个重加载的方法,首先就把所有加载过的参数清除地一干二净:
public Job(Configuration conf, String jobName) throws IOException { this(conf); setJobName(jobName); }
我们调用的初始化实际是这个方法,重点看第一个:
public Job(Configuration conf) throws IOException { this(new JobConf(conf)); }
我们把这个方法拆开看看:
public JobConf(Class exampleClass) { setJarByClass(exampleClass); checkAndWarnDeprecation(); }
public void setJarByClass(Class cls) { String jar = ClassUtil.findContainingJar(cls); if (jar != null) { setJar(jar); } }
就我们传入的conf来说,是不包含jar包的,所以这个方法略过,至于第二个,则是一些检查,略过不提。
外面还有一层,点开看看:
Job(JobConf conf) throws IOException { super(conf, null); // propagate existing user credentials to job this.credentials.mergeAll(this.ugi.getCredentials()); this.cluster = null; }
毫无疑问,要看看super中的构造器:
public JobContextImpl(Configuration conf, JobID jobId) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; } else { this.conf = new JobConf(conf); } this.jobId = jobId; this.credentials = this.conf.getCredentials(); try { this.ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new RuntimeException(e); } }原来是在这儿,我们传入的conf已经是JobConf了:
好了,我们现在有个Job了,接下来五句话,给Job添加了很多的成员变量,掠过不提。
最后一倒数第二句实质上,也是给Job添加成员变量,这都不说,我们直接看waitForCompletion方法:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } }
默认情况下state==JobState.DEFINE,执行submit方法:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
一点点看:
private void ensureState(JobState state) throws IllegalStateException { if (state != this.state) { throw new IllegalStateException("Job in state " + this.state + " instead of " + state); } if (state == JobState.RUNNING && cluster == null) { throw new IllegalStateException("Job in state " + this.state + ", but it isn't attached to any job tracker!"); } }
这没什么可说的,多加了几次验证,我们接着看setUseNewAPI:
int numReduces = conf.getNumReduceTasks();
public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
我们发现,代码刚一开始运行的时候,就需要先确定Num_reduces的数量,这个很重要,而默认情况下,ReduceTask的数量是1,这方便我们最终结果的汇总。
setUseNewAPI();
该方法内逻辑看起来复杂些,但都是为了确认使用新的API,我们这里默认是使用的,所以不分析。
connect();
看这个方法,虽然简短,但非常非常重要:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
前文,ugi已经初始化过了:
/** * User and group information for Hadoop. This class wraps around a JAAS Subject * and provides methods to determine the user's username and groups. It supports * both the Windows, Unix and Kerberos login modules. */ @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "HBase", "Hive", "Oozie" }) @InterfaceStability.Evolving public class UserGroupInformation
注释很清楚,我们对此不分析,主要是记录Hadoop系统使用的用户和组信息等。
我们注意看下Cluster的构造器,这里getConfiguration实际上就是传入的JobConf,不多说了:
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); }
我们注意看第三个方法:
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
这里面的frameworkLoader是个静态成员变量:
private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader .load(ClientProtocolProvider.class);
内部方法我们不细看了,其实质就是找到当前类加载器中,所有继承了ClientProtocolProvider接口的类,而根据方法,我们第一次提交的jobTrackerAddr是Null,需要执行create方法:
clientProtocol = provider.create(conf);
public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); return new LocalJobRunner(conf); }
方法的含义在于,如果我们指定了mapreduce.framework.name为yarn,则调用的clientProtocol是YarnRunner,所以,想要使用Yarn,我们必须注意这个配置:
mapreduce.framework.name=yarn
否则就是本地提交了。
毫无疑问,我们必须看看这个YarnRunner的初始化了:
public YARNRunner(Configuration conf) { this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); }
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) { this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); } }
注意,这里出现了强转,把原先传入的JobConf强转成了YarnConfiguration,至于YarnConfiguration的内容我们不看了,里面定义了很多Yarn自己的专属配置。
public ResourceMgrDelegate(YarnConfiguration conf) { super(ResourceMgrDelegate.class.getName()); this.conf = conf; this.client = YarnClient.createYarnClient(); init(conf); start(); }
我们注意看下这个构造方法,其实质就是和RM形成了一个RPC的连接,并将这个连接拿出去给YarnRunner来用,既然牵涉到RPC了,那我们就看下,因为其中牵涉到了我们常用的一些配置的地址:
@Public public static YarnClient createYarnClient() { YarnClient client = new YarnClientImpl(); return client; }
我们可以看到这里的JobClient及YarnClient,实际上是YarnClientImpl,我们仔细看看其代码,其初始化并不难,但是其中重要的是serviceInit和serviceStart方法,我们要养成一种看Yarn源码的习惯,那就是所有的服务类内,都有这两个方法,统一地进行初始化和服务启动:
这里,serviceInit的方法不说了,我们看serviceStart方法,其比较关键:
rmClient = ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class);
ApplicationClientProtocol是我们Application与RM打交道的通信协议,而这个协议是建立在RPC基础之上的,而这里,就是RPC建立的紧要之处,我们要看看这段代码,看看到底是如何定义了RM的地址以及其他的一些配置:
代码里的getConfig方法不说了,源自于AbstractService中的方法,获取当前配置:
public static <T> T createRMProxy(final Configuration configuration, final Class<T> protocol) throws IOException { return createRMProxy(configuration, protocol, INSTANCE); }
注意里面的INSTANCE,这个东西,实际上是静态初始化的:
private static final ClientRMProxy INSTANCE = new ClientRMProxy();
接着看:
/** * Create a proxy for the specified protocol. For non-HA, this is a direct * connection to the ResourceManager address. When HA is enabled, the proxy * handles the failover between the ResourceManagers as well. */ @Private protected static <T> T createRMProxy(final Configuration configuration, final Class<T> protocol, RMProxy instance) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); RetryPolicy retryPolicy = createRetryPolicy(conf); if (HAUtil.isHAEnabled(conf)) { RMFailoverProxyProvider<T> provider = instance .createRMFailoverProxyProvider(conf, protocol); return (T) RetryProxy.create(protocol, provider, retryPolicy); } else { InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); LOG.info("Connecting to ResourceManager at " + rmAddress); T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); } }
这段代码很关键,我们的conf已经是YarnConfiguration了,不多说,这里,我们默认讨论非RM高可用的状况,直接进入else的代码:
protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException { if (protocol == ApplicationClientProtocol.class) { return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); } }
这下看到了吧,我们的配置实际上是在这里得到使用的,使用的YarnConfiguration里相关的配置,来达到我们建立RPC连接的目的:
public static final String RM_PREFIX = "yarn.resourcemanager.";
public static final String RM_ADDRESS = RM_PREFIX + "address"; public static final int DEFAULT_RM_PORT = 8032; public static final String DEFAULT_RM_ADDRESS = "0.0.0.0:" + DEFAULT_RM_PORT;
所以,默认情况下,我们会连接RM的8032端口,来建立RPC的连接,而如果我们不配置RM_ADDRESS的话,那就只能在RM上来提交任务了,因为0.0.0.0代表当前设备的IP:
到了这里,一个与RM沟通的RPC连接建立起来了:而实际上,持有这个连接的rmClient是YarnClientImpl的成员变量,而这个YarnClientImpl则是ResourceMgrDelegate的内部成员:
而实际上,这个ResourceMgrDelegate则是YARNRunner的内部成员,所以我们明白了:YarnRunner与RM的大多交互,都是通过持有的RPC连接来建立的。
接着继续看submitJobInternal的方法:
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
/** Only for mocking via unit tests. */ @Private public JobSubmitter getJobSubmitter(FileSystem fs, ClientProtocol submitClient) throws IOException { return new JobSubmitter(fs, submitClient); }
前文新建Cluster的时候,已经内置了一个与RM连接的RPC连接,即client,而这里,JobSubmitter内部持有client,两个操作实际上都是通过client来实现的:
接下来,我们认真看其中的逻辑:
checkSpecs(job);
输出目录检查:
private void checkSpecs(Job job) throws ClassNotFoundException, InterruptedException, IOException { JobConf jConf = (JobConf) job.getConfiguration(); // Check the output specification if (jConf.getNumReduceTasks() == 0 ? jConf.getUseNewMapper() : jConf .getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils .newInstance(job.getOutputFormatClass(), job.getConfiguration()); output.checkOutputSpecs(job); } else { jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf); } }
这段逻辑非常简单,具体不分析了,主要是用于判断输出目录是否存在,从方法名就可以看出:
我们看看第二句代码,比较复杂:
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
调用静态方法,找到本次Job提交的工作目录:
/** * Initializes the staging directory and returns the path. It also keeps * track of all necessary ownership & permissions * * @param cluster * @param conf */ public static Path getStagingDir(Cluster cluster, Configuration conf)
public String getStagingAreaDir() throws IOException, InterruptedException { // Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path path = MRApps.getStagingAreaDir(conf, user); LOG.debug("getStagingAreaDir: dir=" + path); return path.toString(); }
这个是为数不多的YarnRunner调用的不需要与RM交互的操作,主要是在本地创建本次Job的工作目录。
还是多说一句,对checkSpecs方法做一定的介绍:
private void checkSpecs(Job job) throws ClassNotFoundException, InterruptedException, IOException { JobConf jConf = (JobConf) job.getConfiguration(); // Check the output specification if (jConf.getNumReduceTasks() == 0 ? jConf.getUseNewMapper() : jConf .getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils .newInstance(job.getOutputFormatClass(), job.getConfiguration()); output.checkOutputSpecs(job); } else { jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf); } }
首先,反射方式获取到我们的OutputFormat的类,checkOutputSpecs方法则是在FileOutputFormat内:
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException { // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null) { throw new InvalidJobConfException("Output directory not set."); } // get delegation token for outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outDir }, job.getConfiguration()); if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } }
这里,假设我们提交的输出路径是:hdfs://host:ip/output,然后我们把重点放在最后一句判断上:
outDir.getFileSystem(job.getConfiguration()).exists(outDir
首先,getFileSystem:
/** Return the FileSystem that owns this Path. */ public FileSystem getFileSystem(Configuration conf) throws IOException { return FileSystem.get(this.toUri(), conf); }
这是定义在Path内的方法,会根据Path的类型来寻找对应的文件系统;这里用到了FileSystem的静态方法,而且这里面,toUri方法是为了获取URI,而实际上,就可以理解为path对应的路径,带有hdfs协议的路径:
有这么一句,我们看看:
if (scheme != null && authority == null) { // no authority URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) // if scheme matches // default && defaultUri.getAuthority() != null) { // & default has // authority return get(defaultUri, conf); // return default } }
获取默认的文件系统URI:
/** * Get the default filesystem URI from a configuration. * * @param conf * the configuration to use * @return the uri of the default filesystem */ public static URI getDefaultUri(Configuration conf) { return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS))); }
public static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
public static final String FS_DEFAULT_NAME_DEFAULT = "file:///";
所以,我们又看到了一个常用的配置,默认需要从这个值来读取对应的文件系统,默认情况下是本地文件系统:
if (conf.getBoolean(disableCacheName, false)) { return createFileSystem(uri, conf); }
createFileSystem不看了,就是创建一个文件系统,在我们的配置下,是hdfs文件系统,内部调用了DistributedFileSystem的initialize方法:
@Override public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); setConf(conf); String host = uri.getHost(); if (host == null) { throw new IOException("Incomplete HDFS URI, no host: " + uri); } homeDirPrefix = conf.get(DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY, DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT); this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = getHomeDirectory(); }
我们注意到,这里建立了DFSClient,具体内容不说了,看下注释即可:
/******************************************************** * DFSClient can connect to a Hadoop Filesystem and * perform basic file tasks. It uses the ClientProtocol * to communicate with a NameNode daemon, and connects * directly to DataNodes to read/write block data. * * Hadoop DFS users should obtain an instance of * DistributedFileSystem, which uses DFSClient to handle * filesystem tasks. * ********************************************************/ @InterfaceAudience.Private public class DFSClient implements java.io.Closeable, RemotePeerFactory, DataEncryptionKeyFactory
程序到这儿,我们看看这个注释,说明DFSClient需要完成于NN的交互,同时需要接受NN的命令,完成于DN的交互,在本文分析中,主要是要把本地文件目录复制到HDFS集群中。
余下有些代码我们直接略过,看这段;
/** * configure the jobconf of the user with the command line options of * -libjars, -files, -archives. * * @param job * @throws IOException */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException { JobResourceUploader rUploader = new JobResourceUploader(jtFs); rUploader.uploadFiles(job, jobSubmitDir); // Set the working directory if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(jtFs.getWorkingDirectory()); } }
实际上,在这个方法执行前,我们的工作目录还依旧是在本地,这里通过JobResourceUploader,把相关文件上传到了HDFS上,具体方法可参见相关代码,内容很多,我们选择部分来看:
short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, Job.DEFAULT_SUBMIT_REPLICATION);
public static final int DEFAULT_SUBMIT_REPLICATION = 10;
默认情况下,会上传十个副本,以尽可能避免任务启动时候齐刷刷下载文件的热点情况:
String files = conf.get("tmpfiles"); String libjars = conf.get("tmpjars"); String archives = conf.get("tmparchives"); String jobJar = job.getJar();
这里,目录内容分为四种情况:临时文件,第三方jar包,临时文档,以及提交的任务的jar包:
int maps = writeSplits(job, submitJobDir);
我们看看这句话,了解一下我们任务的分片到底是如何设计出来的:
@SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance( job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
这里,因为我们使用了newApi,进入了如上方法,这里,我们不会对与FS牵涉过重的逻辑进行分析,只要知道其大致是以Block的大小来为基准区分文件的即可:
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());最后的最后,我们把这个任务提交给了RM,jobId是第一次提交时候返回的ID,submitJobDir则是相关文件已经复制到服务器上的目录,credentials是各种授权信息。
最后明确一下这个整体过程:
- 整体过程中,我们把与Hadoop集群建立了两个连接,一个是YarnRunner持有的RPC连接,与RM交互;一个是DFSClient持有的流式接口,完成和NameNode的文件交互。
- 在正式提交ApplicationMaster之前,会先向集群申请一个ApplicationId,这是全局唯一的,而后我们会把运行程序所需要的本地文件,通过DFSClient持有的RPC连接与NN交互,然后通过文件流的形式,把文件传到HDFS集群中,因为文件不会很小,通过RPC连接比较麻烦