ExecutionGraph:JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。
ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构
client 生成 JobGraph 之后,就通过 submitJob 提交给 JobManager, JobManager 会根据
JobGraph 生成对应的 ExecutionGraph。
它包含的主要抽象概念有:
1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有
和并发度一样多的 ExecutionVertex。
2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输
出是IntermediateResultPartition。
3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个
IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是
ExecutionVertex,consumer是若干个ExecutionEdge。
5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,
target是ExecutionVertex。source和target都只能是一个。
6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过
ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过
ExecutionAttemptID 来确定消息接受者。
入口:runJob 时在 Dispacher 创建 JobManagerRunner 时,调用 createJobManagerRunner
-> new JobManagerRunnerImpl -> createJobMasterService -> new JobMaster(){
this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);
} -> schedulerNGFactory.createInstance -> new DefaultScheduler -> super
-> public SchedulerBase(this.executionGraph = createAndRestoreExecutionGraph)
-> createExecutionGraph -> ExecutionGraphBuilder.buildGraph
// 核心逻辑: 将拓扑排序过的 JobGraph 添加到 executionGraph 数据结构中。
executionGraph.attachJobGraph(sortedTopology)
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
assertRunningInJobMasterMainThread();
LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(),
tasks.size(),
intermediateResults.size());
// TODO ExecutionJobVertex 是执行图的节点
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
// TODO 遍历Job Vertex 执行并行化生成 ExecutioinVertex
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// create the execution job vertex and attach it to the graph
/*TODO 实例化执行图节点,根据每⼀个job vertex,创建对应的 ExecutionVertex*/
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
maxPriorAttemptsHistoryLength,
rpcTimeout,
globalModVersion,
createTimestamp);
/*TODO 核心逻辑:将创建的ExecutionJobVertex与前置的IntermediateResult连接起来*/
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
// TODO 节点总数量需要加上当前执行图节点的并⾏度,因为执行图是作业图的并行化版本
this.numVerticesTotal += ejv.getParallelism();
/*TODO 将当前执⾏图节点加⼊到图中*/
newExecJobVertices.add(ejv);
}
// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
failoverStrategy.notifyNewVertices(newExecJobVertices);
partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
-> ejv.connectToPredecessors(this.intermediateResults);
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
/* TODO 获取输入的JobEdge列表 */
List<JobEdge> inputs = jobVertex.getInputs();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
}
// TODO 遍历每条JobEdge
for (int num = 0; num < inputs.size(); num++) {
JobEdge edge = inputs.get(num);
if (LOG.isDebugEnabled()) {
if (edge.getSource() == null) {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
}
}
// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
// in which this method is called for the job vertices is not a topological order
/*TODO 通过 ID获取当前JobEdge的输入所对应的 IntermediateResult*/
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
if (ires == null) {
throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
/*TODO 将IntermediateResult加入到当前ExecutionJobVertex的输入中*/
this.inputs.add(ires);
/*TODO 为 IntermediateResult 注册 consumer,就是当前节点*/
int consumerIndex = ires.registerConsumer();
// TODO 根据并行度来设置 ExecutionVertex 由于每⼀个并行度都对应⼀个节点。所以要把每个节点都和前面中间结果相连。
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
/*TODO 将 ExecutionVertex与 IntermediateResult关联起来*/
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
-> connectSource
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
// TODO 只有forward的方式的情况下,pattern才是 POINTWISE的,否则均为 ALL_TO_ALL
final DistributionPattern pattern = edge.getDistributionPattern();
final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
ExecutionEdge[] edges;
switch (pattern) {
case POINTWISE:
edges = connectPointwise(sourcePartitions, inputNumber);
break;
case ALL_TO_ALL:
edges = connectAllToAll(sourcePartitions, inputNumber);
break;
default:
throw new RuntimeException("Unrecognized distribution pattern.");
}
inputEdges[inputNumber] = edges;
// add the consumers to the source
// for now (until the receiver initiated handshake is in place), we need to register the
// edges as the execution graph
/*TODO 为IntermediateResultPartition添加consumer,即关联到ExecutionEdge上(之前已经为IntermediateResult添加了consumer)*/
for (ExecutionEdge ee : edges) {
ee.getSource().addConsumer(ee, consumerNumber);
}
}
-> connectAllToAll
private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
for (int i = 0; i < sourcePartitions.length; i++) {
IntermediateResultPartition irp = sourcePartitions[i];
edges[i] = new ExecutionEdge(irp, this, inputNumber);
}
return edges;
}
看这个方法之前,需要知道, ExecutionVertex 的 inputEdges 变量,是一个二维数据。它
表示了这个 ExecutionVertex 上每一个 input 所包含的 ExecutionEdge 列表。
即,如果 ExecutionVertex 有两个不同的输入:输入 A 和 B。其中输入 A 的 partition=1,
输 入 B 的 partition=8 , 那 么 这 个 二 维 数 组 inputEdges 如 下 ( 以 irp 代 替IntermediateResultPartition)
[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]
到这里为止, ExecutionJobGraph 就创建完成了。