在 Client 主要任务之一是计算 job 作业的 splits 切片清单
Client 使用 submit() 方法向 YARN 提交作业。submit() 内部是通过 submitJobInternal(job,cluster) 方法完成实质性的作业提交。submitJobInternal(job,cluster) 方法首先会向 HDFS 上传三个文件:Jar 包,splits 切片清单,xml 配置文件。
然后在该方法体内的第197行调用 writeSplits() 方法计算切片数量
使用新的API,打开该方法的实现类
打开父类 FileInputFormat 的 getSplits() 方法
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
/**
* protected long getFormatMinSplitSize() {
* return 1;
* }
*
* 如果设置了切片的最小值,则返回该值,否则返回默认大小 1
* public static long getMinSplitSize(JobContext job) {
* return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
* }
* 故 minSize 默认大小值为 1
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
/**
* 如果设置了切片的最大值,则返回该值,否则返回默认大小 long 的最大值
* public static long getMaxSplitSize(JobContext context) {
* return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
* }
* 故 maxSize 默认大小值为 Long.MAX_VALUE
*/
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// 返回一个包含了输入路径下的所有文件元数据对象的数组(相当于 ls -l)
List<FileStatus> files = listStatus(job);
// 对每一个文件迭代,计算 splits 切片
for (FileStatus file: files) {
// 返回文件在 HDFS 中的路径
Path path = file.getPath();
// 返回文件大小
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
// 得到文件 block 块的位置信息
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
// 得到 block 块的大小
long blockSize = file.getBlockSize();
/**
* protected long computeSplitSize(long blockSize, long minSize,
* long maxSize) {
* return Math.max(minSize, Math.min(maxSize, blockSize));
* }
* 从该方法中可以得到,split 切片默认大小为 block 块的大小
*
* 要将 split 切片调小,将配置中的 maxSize 调小;否则将 minSize 调大
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// 通过该循环将计算出每个文件 splits 清单
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 每个 split 包含了(hdfs中的路径,偏移量,大小,节点数组)
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}