//historyFile: hdfs://localhost:8020/tmp/hadoop-yarn/staging/houzhizhen/.staging/job_1523876398612_0014/job_1523876398612_0014_1.jhist
FSDataInputStream in = open historyFile
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
/**
* Parse the entire history file and populate the JobInfo object
* The first invocation will populate the object, subsequent calls
* will return the already parsed object.
* The input stream is closed on return
*
* This api ignores partial records and stops parsing on encountering one.
* {@link #getParseException()} can be used to fetch the exception, if any.
*
* @return The populated jobInfo object
* @throws IOException
* @see #getParseException()
*/publicsynchronized JobInfo parse() throws IOException {
return parse(new EventReader(in));
}
EventReader
public EventReader(DataInputStream in) throws IOException {
this.in = in;
this.version = in.readLine();
if (!EventWriter.VERSION.equals(version)) {
thrownew IOException("Incompatible event log version: "+version);
}
Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class);
String eventschema = in.readLine();
if (null != eventschema) {
try {
this.schema = Schema.parse(eventschema);
this.reader = new SpecificDatumReader(schema, myschema);
this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
} catch (AvroRuntimeException e) {
thrownew IOException(e);
}
} else {
thrownew IOException("Event schema string not parsed since its null");
}
}
JobHistoryParser
@Privatepublicsynchronized JobInfo parse(EventReader reader) throws IOException {
if (info != null) {
return info;
}
info = new JobInfo();
parse(reader, this);
return info;
}
public HistoryEvent getNextEvent() throws IOException {
Event wrapper;
try {
wrapper = (Event)reader.read(null, decoder);
} catch (EOFException e) { // at EOFreturnnull;
}
HistoryEvent result;
switch (wrapper.type) {
case JOB_SUBMITTED:
result = new JobSubmittedEvent(); break;
case JOB_INITED:
result = new JobInitedEvent(); break;
case JOB_FINISHED:
result = new JobFinishedEvent(); break;
case JOB_PRIORITY_CHANGED:
result = new JobPriorityChangeEvent(); break;
case JOB_QUEUE_CHANGED:
result = new JobQueueChangeEvent(); break;
case JOB_STATUS_CHANGED:
result = new JobStatusChangedEvent(); break;
case JOB_FAILED:
result = new JobUnsuccessfulCompletionEvent(); break;
case JOB_KILLED:
result = new JobUnsuccessfulCompletionEvent(); break;
case JOB_ERROR:
result = new JobUnsuccessfulCompletionEvent(); break;
case JOB_INFO_CHANGED:
result = new JobInfoChangeEvent(); break;
case TASK_STARTED:
result = new TaskStartedEvent(); break;
case TASK_FINISHED:
result = new TaskFinishedEvent(); break;
case TASK_FAILED:
result = new TaskFailedEvent(); break;
case TASK_UPDATED:
result = new TaskUpdatedEvent(); break;
case MAP_ATTEMPT_STARTED:
result = new TaskAttemptStartedEvent(); break;
case MAP_ATTEMPT_FINISHED:
result = new MapAttemptFinishedEvent(); break;
case MAP_ATTEMPT_FAILED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case MAP_ATTEMPT_KILLED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case REDUCE_ATTEMPT_STARTED:
result = new TaskAttemptStartedEvent(); break;
case REDUCE_ATTEMPT_FINISHED:
result = new ReduceAttemptFinishedEvent(); break;
case REDUCE_ATTEMPT_FAILED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case REDUCE_ATTEMPT_KILLED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case SETUP_ATTEMPT_STARTED:
result = new TaskAttemptStartedEvent(); break;
case SETUP_ATTEMPT_FINISHED:
result = new TaskAttemptFinishedEvent(); break;
case SETUP_ATTEMPT_FAILED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case SETUP_ATTEMPT_KILLED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case CLEANUP_ATTEMPT_STARTED:
result = new TaskAttemptStartedEvent(); break;
case CLEANUP_ATTEMPT_FINISHED:
result = new TaskAttemptFinishedEvent(); break;
case CLEANUP_ATTEMPT_FAILED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case CLEANUP_ATTEMPT_KILLED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case AM_STARTED:
result = new AMStartedEvent(); break;
default:
thrownew RuntimeException("unexpected event type: " + wrapper.type);
}
result.setDatum(wrapper.event);
return result;
}