文章目录
3.1.2 Job提交流程源码和切片源码详解
3.1.2.0 Job提交(包括切片)流程思维导图
3.1.2.1 Job提交流程源码详解
1. waitForCompletion()
- 我们从
Driver类, job.waitForCompletion
处打断点进入方法内.
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
判断状态是否为`DEFINE`
//JobState.DEFINE 相当于还没有被占用,
//在后面job提交过程中,它会将这个状态置为JobState.RUNNING(正在运行
if (this.state == Job.JobState.DEFINE) {
this.submit();
}
if (verbose) {
this.monitorAndPrintJob(); // verbose为true, 就会在控制台打印具体的job相关信息
} else {
.......
}
return this.isSuccessful();
}
2. submit()
- 在
this.submit()
处打断点, 进入到submit()
内部
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
this.ensureState(Job.JobState.DEFINE); //再次确认Job此时的状态
this.setUseNewAPI();// 解决新旧API的兼容性
this.connect(); //此处继续打断点, 等待进入
final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
return submitter.submitJobInternal(Job.this, Job.this.cluster);
}
});
this.state = Job.JobState.RUNNING;
LOG.info("The url to track the job: " + this.getTrackingURL());
}
3. connect()
- 在submit()中继续往下执行, 跳过
ensureState(), setUseNewAPI()
进入到connect()
内部;
- connect() 方法用于网络连接,根据运行环境的不同创建不同的对象,本地创建LocalJobRunner, 集群上创建的是yarnRunner 。
- connect() 方法首先判断,集群是否已经创建, 如果集群为空就创建一个集群
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException {
if (this.cluster == null) {
this.cluster = (Cluster)this.ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException, InterruptedException, ClassNotFoundException {
return new Cluster(Job.this.getConfiguration()); //断点, 集群的初始化工作
}
});
}
}
- 在connect方法的
return new Cluster(Job.this.getConfiguration());
处打上断点, 并按住ctrl点击Cluster进入到Cluster类,
- 集群的创建和初始化
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
this.fs = null;
this.sysDir = null;
this.stagingAreaDir = null;
this.jobHistoryDir = null;
this.providerList = null;
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
this.initialize(jobTrackAddr, conf); //继续打断点
}
- 在
this.initialize(jobTrackAddr, conf);
上再打上一个断点.
4. initialize()
- 往下跳过执行直到来到
initialize方法
上, 该方法的唯一任务是确定客户端通信协议提供者
,并通过该对象的create方法构造客户端通信协议对象实例client。核心目的也就是要判断当前客户端是本地客户端还是Yarn客户端
。
- initialize的方法内部:
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
this.initProviderList();
IOException initEx = new IOException("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.");
if (jobTrackAddr != null) {
LOG.info("Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
}
Iterator var4 = this.providerList.iterator();
while(var4.hasNext()) {
ClientProtocolProvider provider = (ClientProtocolProvider)var4.next();
LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
this.clientProtocolProvider = provider;
this.client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider");
break;
}
LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol");
} catch (Exception var9) {
String errMsg = "Failed to use " + provider.getClass().getName() + " due to error: ";
initEx.addSuppressed(new IOException(errMsg, var9));
LOG.info(errMsg, var9);
}
}
if (null == this.clientProtocolProvider || null == this.client) {
throw initEx;
}
}
ClientProtocol getClient() {
return this.client;
}
Configuration getConf() {
return this.conf;
}
public synchronized void close() throws IOException {
this.clientProtocolProvider.close(this.client);
}
- 了解了上面的开胃小菜(客户端与集群的连接过程), 我们正式进入Job提交过程的源码
5. submitJobInternal()
- 从第5步不断的跳过直到
回到submit()
方法内, 在submitJobInternal()
上打断点, 如下图所示:
- 进入
submitJobInternal
方法体内我们继续在下图两处打上断点:
6. checkSpecs()
- 首先我们进入
checkSpecs()方法
, 此方法的作用是用来检查输出路径;
-
打断点查看
output.checkOutputSpecs(job);
-
checkOutputSpecs(job)的方法体
7. submitJobInternal, JobId的生成.
- 检查输出目录合法性之后, 我们回到
submitJobInternal
继续往下执行, 具体工作如下图所示:
注意: 此时还未在主机中真正的创建以JobId为名称的目录!!!
8. copyAndConfigureFiles
-
在
submitJobInternal()中
继续往下跳过执行, 直到看到copyAndConfigureFiles
,打断点如下所示:
-
从第九步, 先进入
copyAndConfigureFiles
, 然后在uploadResources(job, jobSubmitDir)
这一行继续打断点并进入, 最后进入uploadResourcesInternal(Job job, Path submitJobDir)
方法.
- 在
uploadResourcesInternal(Job job, Path submitJobDir)
方法中完成了对job临时工作目录的创建工作. - 同时在此方法的末尾, 完成了把相关的文件提交到集群.(本地集群不会上传jar包)
7. submitJobInternal, 切片的生成,配置文件的生成
-
从
copyAndConfigureFiles()
中, 我们完成了对job临时工作目录的创建, 并提交了一些必要的文件到集群中.回到submitJobInternal
继续往下执行,writeSplits()
-
继续往下执行,
writeConf()
-
返回到submit()
-
从submit()中退出, 兜兜转转, 又回到了最初的起点
waitForCompletion()
,
- 返回成功信息, 并清空临时的job工作目录
总结: Job提交过程
主要就是将Jar包(非本地集群)\切片文件\XML配置文件提交到MapReduce运行的服务器,之后监控Job运行状态,返回运行信息
3.1.2.2 切片源码详解(以FileInutFormat为例)
1. submitJobInternal(job, cluster)
- 在
submitJobInternal(job, cluster)
方法中, 我们打断点如下图所示:
2. writeSplits(job, submitDir)
- 进入到
writeSplits(job, submitDir)
内部, 打断点如图所示:
3. writeNewSplits(job, jobSubmitDir) -> getSplits()
- 进入到
writeNewSplits(job, jobSubmitDir))
, 继续在getSplits()
上打断点, 并进入到FileInputFormat类的getSplits()方法内部
, 此方法主要是完成对各个输入文件的切片操作, 这个实现类的切片规则详见本文的末尾.
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = (new StopWatch()).start();
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));//切片最小尺寸
long maxSize = getMaxSplitSize(job); //切片最大尺寸
List<InputSplit> splits = new ArrayList();
List<FileStatus> files = this.listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", false);
Iterator var10 = files.iterator();
//对多个文件分别进行遍历切片处理, 一个文件就是迭代器的一次迭代遍历,
//每个文件单独执行切片过程
while(true) {
while(true) {
while(true) {
FileStatus file;
do {
if (!var10.hasNext()) {
job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
file = (FileStatus)var10.next();
} while(ignoreDirs && file.isDirectory());
Path path = file.getPath();
long length = file.getLen();
if (length != 0L) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0L, length);
}
//判断文件是否是可以切割的
if (this.isSplitable(job, path)) {
//获取块大小(因为我们此时采用的是本地集群, 资源不是很充足, 所以块大小默认是32MB)(33554432-32MB)
long blockSize = file.getBlockSize();
//获取切片大小, 也是32MB,因为块大小=切片大小
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining;//每次切片后, 剩余文件块的大小
int blkIndex;
/重点: 一个文件32.1MB, 切片大小32MB, 是否应该是切片为一片32MB, 一片0.1MB呢?
//解决办法如下面代码所示: 剩余的文件块大小(bytesRemaining)除以切片大小(splitSize),
//如果>1.1的话, 就继续切片, 否则不切了
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
if (bytesRemaining != 0L) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
} else {
if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
}
splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, new String[0]));
}
}
}
}
}
4. writeNewSplits(job, jobSubmitDir) --> createSplits(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
- 切片操作完成后, 方法退回到writeNewSplits()并继续向下执行
- 执行到断点后, 可以发现, 切片文件已经在临时工作目录中建立了
切片重点,总结
- 每一个文件都是单独进行切片的;
- 切片大小取决于Math.max(minSize, Math.min(maxSize, blockSize))
- 判断是否还继续进行切片的标准就是:
每次切片后剩余文件大小/切片大小的值是否 > 1.1