一、基本知识
Mapreduce 处理的数据都必须实现或间接实现 org.apache.hadoop.io.Writable 接口
二、运行流程
- main函数中有这样一句话:
System.exit(job.waitForCompletion(true) ? 0 : 1);
查看waitForCompletion()方法可以看到这样一句代码:
if (state == JobState.DEFINE) {
submit();
}
submit()方法中红色代码触发内部的提交过程的代码
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);//触发内部的提交过程的代码
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
这个内部提交完成了以下几件事情:
1.检查输入输出声明是否合法
2.计算InputSplit过程,代码如下:
private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
可见这里调用了InputFormat,得到了List splits,并进行了排序操作,且大块的split会优先处理。
3. DistributedCache 操作,就是把用户指定需要缓存的文件加载到各个节点上。
4. 任务jar包的拷贝分发和缓存,和3操作的原理一样。
5. 通过private ClientProtocol submitClient 真正的提交任务到集群,也就是前面的步骤还是一些准备工作。