我们先来跟着走一遍源码,等对源码步骤熟悉之后,下面再来做总结,相信如果完整的跟着走一遍源码,绝对会对MapReduce的Client有更深入的理解
我这里用的工具是IDEA ,Hadoop版本是2.7.2,下面就开始源码Client的分析
Client 源码分析
首先,我们先观察,发现客户端提交作业是在job的waitForCompletion方法,接下来我们进去看一看
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//查看状态是否为DEFINE,定义状态
if (state == JobState.DEFINE) {
//真正提交的核心源码
submit();
}
if (verbose) {
//监控并打印作业的情况
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
//返回作业的执行结果
return isSuccessful();
}
接下来,我们进入提交的核心源码 submit中看看
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//在确保一下你的状态是DEFINE
ensureState(JobState.DEFINE);
//使用新的API,因为Hadoop经过长期发展,有旧的API的存在,所以在全部转换新的API
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
我们可以先跟进去ensureState(JobState.DEFINE);,看看到底是怎么确保状态的
private void ensureState(JobState state) throws IllegalStateException {
//判断如果你的状态不是DEFINE,直接报异常出去
if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state +
" instead of " + state);
}
if (state == JobState.RUNNING && cluster == null) {
throw new IllegalStateException
("Job in state " + this.state
+ ", but it isn't attached to any job tracker!");
}
}
那么我们可以跳出去,继续分析,我们看一看上面的connect()方法,在此之前,我们做了两件事情
1.确保状态 2. 使用新的API
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
//判断集群是否为空,,, 因为我们是刚运行,所以我们这里是还没有cluster对象的
if (cluster == null) {
cluster =
//这是一个匿名内部类
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
//新建一个集群对象,把我们的配置对象放进去了
return new Cluster(getConfiguration());
}
});
}
}
我们点击new Cluster,进去看一看,是怎么给我们创建Cluster的
public Cluster(Configuration conf) throws IOException {
//重载方法,我们走下面那个
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
//把经常用的配置文文件赋值给conf
this.conf = conf;
//获取当前的用户
this.ugi = UserGroupInformation.getCurrentUser();
//初始化相关操作
initialize(jobTrackAddr, conf);
}
这里conf的值我们看一下,为我们经常用的那些配置文件,比如core-default.xml之类的
接下来,我们跟进initialize方法里面瞧一瞧
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
//同步
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
//日志打印不用管
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
// 客户端协议 ,为空
ClientProtocol clientProtocol = null;
try {
//如果jobTrackAddr为空,帮我们创建一个客户端协议
if (jobTrackAddr == null) {
// 我们程序在哪里运行就会给我们创建什么客户端协议。我这里是本地运行,所以给我创建的是本地
clientProtocol = provider.create(conf);
} else {
//如果是yarn运行环境,则会走这里,创建YarnJobRunner
clientProtocol = provider.create(jobTrackAddr, conf);
}
//下面都是一些日志的记录,我们走下去
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: ", e);
}
......
}
}
到这里,我们的initialize方法就走完了,connect方法也就走完了,给我们创建了一个Cluster对象,目前在提交的时候,给我们做了三件事
- 先是判断状态
- 转换新的API
- 创建Cluster集群对象(根据当前的运行环境local还是yarn,创建不同的运行者)
前面三件事,都还没有真正的提交,我们接着往下走
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//在确保一下你的状态是DEFINE
ensureState(JobState.DEFINE);
//使用新的API,因为Hadoop经过长期发展,有旧的API的存在,所以在全部转换新的API
setUseNewAPI();
//创建Cluster集群对象(根据当前的运行环境local还是yarn,创建不同的运行者)
connect();
// 这里的Cluster就是我们Connect创建的
// 根据我们的cluster获取文件系统,cluster获取客户端调用getJobSubmitter方法,
// 创建JobSubmitter对象,JobSubmitter调用submitJobInternal方法来提交
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
//匿名内部类
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//这里很重要,我们先来看看官方说这个方法做了什么
// 1. 检查我们的输入输出
// 2. 为我们的作业计算输入的切片数量
// 3,4. 把资源提交到我们的分布式文件系统
// 5 . 将作业联系JobTracker并监视其状态
// 这里的JobTracker是1.x的资源调度,现在是yarn(这个说明还是很老了)
// (这里描述了很多功能,最重要的还是第二步,有逻辑和计算公式)
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
我们来进入submitJobInternal方法里面,来进行分析
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
//检查输出,我们先来看一下怎么验证输出空间
checkSpecs(job);
// 看下面那个maps
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
......
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 见名知意,书写我们的切片信息
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
......
}
}
我们点进去checkSpecs(job)方法,看看到底是怎么验证的
private void checkSpecs(Job job) throws ClassNotFoundException,
InterruptedException, IOException {
// 获取我们所有的配置信息,,core-site.xml之类的
JobConf jConf = (JobConf)job.getConfiguration();
// Check the output specification
if (jConf.getNumReduceTasks() == 0 ?
jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
ReflectionUtils.newInstance(job.getOutputFormatClass(),
job.getConfiguration());
//接下来,我们要跟到这里面,看一下怎么验证的
output.checkOutputSpecs(job);
} else {
jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
}
}
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
//获取作业的输出路径
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
// 输出目录存在就报异常
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
然后我们出来
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
//验证我们的输出
checkSpecs(job);
//获取我们的配置对象
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
//现在我们还没有正式提交
// 这里给我们生成了一个本地文件,具体图在下面,我们可以去当前工作目录下去查看
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
//我们可以看到目录里面现在还没有内容,我们提交job肯定要提交一些东西的,所以继续往下看
//这里获取到了我当前的Ip:192.168.56.1
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
//获取了我当前的计算机名字
submitHostName = ip.getHostName();
//这里设置我们当前job是谁提交的,提交的地址是什么
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
//给我们当前提交的job设置id
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//job的提交目录
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
//做一些配置
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
//做一些缓存的操作
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
//在这个方法之前,我们的那个.staging目录下面,还没有文件
// 拷贝并且配置一些文件,我们跟进去看一看
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
......
}
}
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
// 开始上传了,我们先创建了一个JobResourceUploader,通过uploadFiles,我们进去看一看
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
// the job. This is necessary for backward compatibility as other systems
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
job.getWorkingDirectory();
}
public void uploadFiles(Job job, Path submitJobDir) throws IOException {
//获取我们的配置信息
Configuration conf = job.getConfiguration();
//获取一些默认的配置信息
short replication =
(short) conf.getInt(Job.SUBMIT_REPLICATION,
Job.DEFAULT_SUBMIT_REPLICATION);
if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
LOG.warn("Hadoop command-line option parsing not performed. "
+ "Implement the Tool interface and execute your application "
+ "with ToolRunner to remedy this.");
}
//获取一些命令参数,如果我们在yarn运行的话就会获取,我们不是所以不看
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();
//
// Figure out what fs the JobTracker is using. Copy the
// job to it, under a temporary name. This allows DFS to work,
// and under the local fs also provides UNIX-like object loading
// semantics. (that is, if the job file is deleted right after
// submission, we can still run the submission to completion)
//
// Create a number of filenames in the JobTracker's fs namespace
LOG.debug("default FileSystem: " + jtFs.getUri());
//如果存在抛出异常
if (jtFs.exists(submitJobDir)) {
throw new IOException("Not submitting job. Job directory " + submitJobDir
+ " already exists!! This is unexpected.Please check what's there in"
+ " that directory");
}
//在这里创建了一个submitJobDir对象,
submitJobDir = jtFs.makeQualified(submitJobDir);
//它根据我们当前的submitdir创建了一个新的Path,不过这个是在内存中,现在看不见
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
//执行到这里,我们就创建了一个目录,可以看见的 job_local1889043158_0001,
//我们提交的内容都要放到这个目录里面来
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
//我们没有这几个目录
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
//我们没有
if (files != null) {
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
for (String tmpFile : fileArr) {
URI tmpURI = null;
try {
tmpURI = new URI(tmpFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
}
}
}
//我们没有
if (libjars != null) {
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
String[] libjarsArr = libjars.split(",");
for (String tmpjars : libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
DistributedCache.addFileToClassPath(
new Path(newPath.toUri().getPath()), conf, jtFs);
}
}
//我们没有
if (archives != null) {
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
String[] archivesArr = archives.split(",");
for (String tmpArchives : archivesArr) {
URI tmpURI;
try {
tmpURI = new URI(tmpArchives);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheArchive(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue);
}
}
}
//我们没有
if (jobJar != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
job.setJobName(new Path(jobJar).getName());
}
Path jobJarPath = new Path(jobJar);
URI jobJarURI = jobJarPath.toUri();
// If the job jar is already in a global fs,
// we don't need to copy it from local fs
if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication);
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
}
} else {
LOG.warn("No job jar file set. User classes may not be found. "
+ "See Job or Job#setJar(String).");
}
//下面是往缓存里面加内容,就不用管了
addLog4jToDistributedCache(job, submitJobDir);
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
// get DelegationToken for cached file
ClientDistributedCacheManager.getDelegationTokens(conf,
job.getCredentials());
}
我们出去
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
// 开始上传了,我们先创建了一个JobResourceUploader,通过uploadFiles,我们进去看一看
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
// the job. This is necessary for backward compatibility as other systems
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
//创建目录并且把相关文件拷贝过来,这个我们已经有了
job.getWorkingDirectory();
}
我们回去
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
//检查输出,我们先来看一下怎么验证输出空间
checkSpecs(job);
// 看下面那个maps
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
......
//获取我们jiob的目录
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//计算逻辑上的切片
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
我们进去 writeSplits方法里面看一下
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//根据我们使用的新旧API来调用方法,我们用的是新的,走writeNewSplits
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
我们点进去WriteNewSplits方法里面
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
//通过job获取Configuration
Configuration conf = job.getConfiguration();
//通过反射获取一个InputFormat,我们先看第一个参数,job.getInputFormatClass()
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
我们先看这个 job.getInputFormatClass()的实现父类 JobContextImpl
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
// 通过配置文件里面取inputformat,如果用户配置了取用户配置的,如果用户没有配置默认取TextInputFormat
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
然后我们跳出去,回到上一步
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
//通过job获取Configuration
Configuration conf = job.getConfiguration();
//通过反射获取一个InputFormat,我们先看第一个参数,job.getInputFormatClass()
//到这里,我们创建了一个输入格式化,在默认情况下,框架的输入格式化类是TextInputFormat
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//由输入格式化类来计算切片,这里很重要
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
我们进去getSplits里面,发现默认的TextInputFormat没有这个方法,我们查看FileInputFormat类,这里是核心计算切片的代码
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
//最小值 用getFormatMinSplitSize(),和getMinSplitSize(job)取最大值
//我们点进去getFormatMinSplitSize(),会发现返回的是 1
// 我们进去getMinSplitSize(job),如果用户配置了SPLIT_MINSIZE,就取用户的,用户没有配置取 1
// 所以,默认 Math.max(1,1) minSize = 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//最大值, 我们点进去看一下,同样,如果用户配置了 SPLIT_MAXSIZE,就取用户的,用户没有配置
//默认是LONG的最大值。
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// 将我们输入源里面的所有文件都获取出来 (是一个文件列表)
List<FileStatus> files = listStatus(job);
// 接下来我们用增强for循坏迭代其中的每一个文件
// 这个大循环里面就是在计算切片,输出每一个文件的所有切片 最终就是,面向文件产生这个文件的所有切片
// 并且,切片是不可以跨文件的,一个切片读不到两个文件
for (FileStatus file: files) {
//通过getPath拿到每个文件的路径
Path path = file.getPath();
//拿到文件的大小
long length = file.getLen();
if (length != 0) {
//块的列表信息
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
//通过路径,调getFileSystem传入配置信息,就可以返回一个分布式文件系统对象
FileSystem fs = path.getFileSystem(job.getConfiguration());
//通过分布式文件系统对象,调用getFileBlockLocations,
//把这个文件传进去,从0开始到他的长度,就可以取回这个文件的所有块
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//如果你当前的文件是允许切片的,有些文件不允许切片,比如压缩后的文件
if (isSplitable(job, path)) {
//获取每个文件的块儿大小
long blockSize = file.getBlockSize();
//获取切片的大小: 通过computeSplitSize方法计算切片的大小,每一个文件的切片大小都可能不一样
//因为我们是面向每个文件,单独计算切片,把块儿的大小和上面的最小值和最大值传到方法里,算法也很简单
// 就是 Math.max(minSize, Math.min(maxSize, blockSize));
// 先在(maxSize, blockSize)取最小值,取出的最小值在和minSize取最大值
// 框架默认maxSize是LONG的最+大值,minSize是1,
// 我们的blockSize最大不会超过Long的最大值,不会小于1
// 所以这个算法取出来的就是我们的blockSize
// 得出结论,在默认情况下,切片的大小等于块的大小
// 因为每个文件的块都可能不一样,所有每个切片的大小也不一样,
// 所以,如果一个mr程序计算两个文件,这两个文件块儿大小不一样,默认他们的切片就不一样
// 如果想启动多个map,那么我们就可以调MaxSize,调的比块小就可以
// 如果希望map少一些,那么就可以调MinSize,调的比块儿大
//默认本地块大小为32M
//这里只是算出初始的切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//把文件的大小赋值给bytesRemaining
long bytesRemaining = length;
// 每一次循坏都会减掉一个切片的大小,第一次减掉一个切片的大小,第二次减掉两个。。。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//这里做一个说明, length-bytesRemaining ,第一次循环是0,
//第二次循环是length减去少于一个切片的大小,
//第三次循环是length减去少于两个切片的大小,这里我们举个例子
//比如我的文件是25,切片是5
//第一次是 25 - 25 = 0
//第二次是 25 -(25 - 5) = 5
//第三次是 25 - (20 - 5) = 10
//第四次是 25 - (15 - 5) = 15
// ....以此继续
//length-bytesRemaining就能让我们就算出切片的偏移量
//最终我们来看这个方法 getBlockIndex,是通过块的列表信息和切片的偏移量算出块的索引
//进入这个方法看一下,写到后面
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
这里是上面要看的getBlockIndex方法
//参数已经这里告诉我们了blkLocations,就是块的信息
protected int getBlockIndex(BlockLocation[] blkLocations,
//offset就是偏移量,不过我们要知道怎么算出来的
long offset) {
//这个循环是迭代所有的块信息
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
// (这里画个图来描述,在下面)
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
通过上面这个图,我们详细理解了这个方法的意思就是,找出关联关系,到底哪个块包含这个切片
,我们继续来分析
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//根据这个方法,我们知道了到底是哪个块包含这个切片 blkIndex是包含切片的块的索引
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//创建一个切片,因为是一个循环,所以面向这个文件,循环一次创建一个切片,加入到list里面
//这个切片内有5个东西,分别看一下
// 1. 这个切片是哪个文件的
// 2. 这个切片的偏移量
// 3. 这个切片的大小
// 4. 切片的位置信息
// 5. 在内存的位置
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
//意思是在所有块中,取包含切片的块,将块的位置信息赋给切片
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
到这里,我们的切片源码已经分析完成,返回前面
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
......
//获取我们jiob的目录
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//计算逻辑上的切片
int maps = writeSplits(job, submitJobDir);
//计算完后,会将信息放到我们的job目录下
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
//做一些日志的记录
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
//做一些缓存的操作
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
//接下来进行这个操作,我们进去看看
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
private void writeConf(Configuration conf, Path jobFile)
throws IOException {
// Write job file to JobTracker's fs
//这里获取了fs的输出流
FSDataOutputStream out =
FileSystem.create(jtFs, jobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
//写xml文件到我们的job目录,我们去目录下面看看xml里面是什么东西
conf.writeXml(out);
} finally {
out.close();
}
}
split这些文件是我们切片走完后生成的,我们看这个job.xml
里面是我们hadoop所有的配置文件信息,我们回去
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
......
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
//到这里我们就正式提交了,会返回一个结果
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
到这里,我们的Client也已经跟踪完了,总结一下就是
waitForCompletion()
submit();
// 1建立连接
connect();
// 1)创建提交Job的代理
new Cluster(getConfiguration());
// (1)判断是本地yarn还是远程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
如果看完,能完整的跟着走一遍将对以后的学习很有帮助。