本文从Job提交,逐步分析Splits相关源码。
数据块:Block是HDFS物理上把数据分成一块一块的。
数据切片:数据切片只是在物理上输入进行分片,并不会在磁盘上将其分成片进行存储。
文件路径 org/apache/hadoop/mapreduce/JobSubmitter.java
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
writeSplits
获取配置信息,根据Mapper版本调用对应的方法。
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
//JobContextImpl
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//true
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
writeNewSplits
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException,ClassNotFoundException {
//获取配置
Configuration conf = job.getConfiguration();
//反射获取InoutFormatClass
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
getInputFormatClass
默认情况下 TextInputformat
对任务的切片机制是按文件规划切片。
文件路径/org/apache/hadoop/mapreduce/task/JobContextImpl.java
public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
input.getSplits(job)
整体流程:
- 找到你数据存储的目录。
- 开始遍历处理(规划切片)目录下的每一个文件
- 遍历第一个文件
- 获取文件大小
- 计算切片大小,默认情况下,切片大小=blocksize
- 每次切片时,都要判断切完剩下的部分是否大于块的 1.1 倍, 不大于 1.1 倍就划分一块切片
- 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在 getSplit()方法中完成
- 数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节 点列表等
- 注意:block 是 HDFS 物理上存储的数据,切片是对数据逻辑上的划分
- 提交切片规划文件到 yarn 上,yarn 上的 MrAppMaster 就可以根据切片规划文件计
算开启 maptask 个数。
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
//获取文件大小
long blockSize = file.getBlockSize();
//计算切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
获取配置的切片最大值、最小值
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
}
//xml中配置参数
public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
计算切片大小 computeSplitSize
默认情况下,切片大小=blocksize
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
SPLIT_SLOP
每次切片时,都要判断切完剩下的部分是否大于块的 1.1 倍, 不大于 1.1 倍就划分一块切片
private static final double SPLIT_SLOP = 1.1; // 10% slop
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}