文章目录
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
//确保状态
ensureState(JobState.DEFINE);
//使用新的API
setUseNewAPI();
//建立连接
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
ensureState 确保状态
//确保状态
private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state +
" instead of " + state);
}
if (state == JobState.RUNNING && cluster == null) {
throw new IllegalStateException
("Job in state " + this.state
+ ", but it isn't attached to any job tracker!");
}
}
setUseNewAPI
默认为使用新的API,除非显式设置或者有旧的mapper或reduce属性使用。
private void setUseNewAPI() throws IOException {
int numReduces = conf.getNumReduceTasks();
String oldMapperClass = "mapred.mapper.class";
String oldReduceClass = "mapred.reducer.class";
conf.setBooleanIfUnset("mapred.mapper.new-api",
conf.get(oldMapperClass) == null);
if (conf.getUseNewMapper()) {
String mode = "new map API";
ensureNotSet("mapred.input.format.class", mode);
ensureNotSet(oldMapperClass, mode);
if (numReduces != 0) {
ensureNotSet("mapred.partitioner.class", mode);
} else {
ensureNotSet("mapred.output.format.class", mode);
}
} else {
String mode = "map compatability";
ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(MAP_CLASS_ATTR, mode);
if (numReduces != 0) {
ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
} else {
ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
}
}
if (numReduces != 0) {
conf.setBooleanIfUnset("mapred.reducer.new-api",
conf.get(oldReduceClass) == null);
if (conf.getUseNewReducer()) {
String mode = "new reduce API";
ensureNotSet("mapred.output.format.class", mode);
ensureNotSet(oldReduceClass, mode);
} else {
String mode = "reduce compatability";
ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(REDUCE_CLASS_ATTR, mode);
}
}
}
connect
private synchronized void connect() throws IOException,InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException, InterruptedException, ClassNotFoundException {
//创建提交 job 的代理
return new Cluster(getConfiguration());
}
});
}
}
initialize
判断是本地、yarn、还是远程。
private void initialize(InetSocketAddress jobTrackAddr, Configurationconf) throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: ", e);
}
}
}
提交 job
getStagingDir
创建给集群提交数据的 Stag 路径
getStagingAreaDir
获取Stag 临时目录
//创建给集群提交数据的 Stag 路径
public Path getStagingAreaDir() throws IOException,InterruptedException {
if (stagingAreaDir == null) {
stagingAreaDir = new Path(client.getStagingAreaDir());
}
return stagingAreaDir;
}
getLoginUser
public synchronized static UserGroupInformation getLoginUser() throws IOException {
if (loginUser == null) {
loginUserFromSubject(null);
}
return loginUser;
}
mkdirs
创建目录
fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION));
// job submission directory is private!
final public static FsPermission JOB_DIR_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
//job files are world-wide readable and owner writable
final public static FsPermission JOB_FILE_PERMISSION =
FsPermission.createImmutable((short) 0644); // rw-r--r--
getLocalHost & getNewJobID
获取 jobid ,并创建 job 路径
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
copyAndConfigureFiles
拷贝 jar 包到集群
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
// the job. This is necessary for backward compatibility as other systems
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
job.getWorkingDirectory();
}
writeSplits
计算切片,生成切片规划文件。我选择的文件很小,maps = 1。
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
writeConf(conf, submitJobFile)
向 Stag 路径写 xml 配置文件
private void writeConf(Configuration conf, Path jobFile)
throws IOException {
// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(jtFs, jobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
conf.writeXml(out);
} finally {
out.close();
}
}
submitClient.submitJob
提交 job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
jtFs.delete
删除 Stag 临时目录中的文件