一、JobGraph
在Flink中的有以下几种图,即StreamGraph,它用来生成JobGraph,然后再由分发器将其分发生成ExecutionGraph并进而形成Task任务执行的任务图(这个图就只是一个逻辑概念了)。JobGraph是非常重要的一环,其它的图以后再详细分析。学过图的都知道,图和其它数据结构明显不同的是,图有节点和边的概念。
那么看一下Flink中这个图的定义:
public class JobGraph implements Serializable {
private static final long serialVersionUID = 1L;
// --- job and configuration ---
/** List of task vertices included in this job graph. */
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
/** The job configuration attached to this job. */
private final Configuration jobConfiguration = new Configuration();
/** ID of this job. May be set if specific job id is desired (e.g. session management) */
private final JobID jobID;
/** Name of this job. */
private final String jobName;
/** The number of seconds after which the corresponding ExecutionGraph is removed at the
* job manager after it has been executed. */
private long sessionTimeout = 0;
/** flag to enable queued scheduling */
private boolean allowQueuedScheduling;
/** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
// --- checkpointing ---
/** Job specific execution config */
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** The settings for the job checkpoints */
private JobCheckpointingSettings snapshotSettings;
/** Savepoint restore settings. */
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
// --- attached resources ---
/** Set of JAR files required to run this job. */
private final List<Path> userJars = new ArrayList<Path>();
/** Set of custom files required to run this job. */
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
/** List of classpaths required to run this job. */
private List<URL> classpaths = Collections.emptyList();
// --------------------------------------------------------------------------------------------
/**
* Constructs a new job graph with the given name, the given {@link ExecutionConfig},
* and a random job ID. The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobName The name of the job.
*/
public JobGraph(String jobName) {
this(null, jobName);
}
/**
* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
* the given name and the given execution configuration (see {@link ExecutionConfig}).
* The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
* @param jobName The name of the job.
*/
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
try {
setExecutionConfig(new ExecutionConfig());
} catch (IOException e) {
// this should never happen, since an empty execution config is always serializable
throw new RuntimeException("bug, empty execution config is not serializable");
}
}
/**
* Constructs a new job graph with no name, a random job ID, the given {@link ExecutionConfig}, and
* the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param vertices The vertices to add to the graph.
*/
public JobGraph(JobVertex... vertices) {
this(null, vertices);
}
/**
* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, a random job ID,
* and the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobName The name of the job.
* @param vertices The vertices to add to the graph.
*/
public JobGraph(String jobName, JobVertex... vertices) {
this(null, jobName, vertices);
}
/**
* Constructs a new job graph with the given name, the given {@link ExecutionConfig},
* the given jobId or a random one if null supplied, and the given job vertices.
* The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
* @param jobName The name of the job.
* @param vertices The vertices to add to the graph.
*/
public JobGraph(JobID jobId, String jobName, JobVertex... vertices) {
this(jobId, jobName);
for (JobVertex vertex : vertices) {
addVertex(vertex);
}
}
......
/**
* Sets the savepoint restore settings.
* @param settings The savepoint restore settings.
*/
public void setSavepointRestoreSettings(SavepointRestoreSettings settings) {
this.savepointRestoreSettings = checkNotNull(settings, "Savepoint restore settings");
}
/**
* Returns the configured savepoint restore setting.
* @return The configured savepoint restore settings.
*/
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointRestoreSettings;
}
/**
* Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
* transport. Further modification of the referenced ExecutionConfig object will not affect
* this serialized copy.
*
* @param executionConfig The ExecutionConfig to be serialized.
* @throws IOException Thrown if the serialization of the ExecutionConfig fails
*/
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException {
checkNotNull(executionConfig, "ExecutionConfig must not be null.");
this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
}
/**
* Adds a new task vertex to the job graph if it is not already included.
*
* @param vertex
* the new task vertex to be added
*/
public void addVertex(JobVertex vertex) {
final JobVertexID id = vertex.getID();
JobVertex previous = taskVertices.put(id, vertex);
// if we had a prior association, restore and throw an exception
if (previous != null) {
taskVertices.put(id, previous);
throw new IllegalArgumentException("The JobGraph already contains a vertex with that id.");
}
}
/**
* Returns an Iterable to iterate all vertices registered with the job graph.
*
* @return an Iterable to iterate all vertices registered with the job graph
*/
public Iterable<JobVertex> getVertices() {
return this.taskVertices.values();
}
/**
* Returns an array of all job vertices that are registered with the job graph. The order in which the vertices
* appear in the list is not defined.
*
* @return an array of all job vertices that are registered with the job graph
*/
public JobVertex[] getVerticesAsArray() {
return this.taskVertices.values().toArray(new JobVertex[this.taskVertices.size()]);
}
/**
* Returns the number of all vertices.
*
* @return The number of all vertices.
*/
public int getNumberOfVertices() {
return this.taskVertices.size();
}
/**
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
*
* @param settings The snapshot settings
*/
public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
}
/**
* Gets the settings for asynchronous snapshots. This method returns null, when
* checkpointing is not enabled.
*
* @return The snapshot settings
*/
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
/**
* Checks if the checkpointing was enabled for this job graph
*
* @return true if checkpointing enabled
*/
public boolean isCheckpointingEnabled() {
if (snapshotSettings == null) {
return false;
}
long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
return checkpointInterval > 0 &&
checkpointInterval < Long.MAX_VALUE;
}
/**
* Searches for a vertex with a matching ID and returns it.
*
* @param id
* the ID of the vertex to search for
* @return the vertex with the matching ID or <code>null</code> if no vertex with such ID could be found
*/
public JobVertex findVertexByID(JobVertexID id) {
return this.taskVertices.get(id);
}
/**
* Sets the classpaths required to run the job on a task manager.
*
* @param paths paths of the directories/JAR files required to run the job on a task manager
*/
public void setClasspaths(List<URL> paths) {
classpaths = paths;
}
public List<URL> getClasspaths() {
return classpaths;
}
/**
* Gets the maximum parallelism of all operations in this job graph.
*
* @return The maximum parallelism of this job graph
*/
public int getMaximumParallelism() {
int maxParallelism = -1;
for (JobVertex vertex : taskVertices.values()) {
maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
}
return maxParallelism;
}
// --------------------------------------------------------------------------------------------
// Topological Graph Access
// --------------------------------------------------------------------------------------------
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
// early out on empty lists
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
{
Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
JobVertex vertex = iter.next();
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
iter.remove();
}
}
}
int startNodePos = 0;
// traverse from the nodes that were added until we found all elements
while (!remaining.isEmpty()) {
// first check if we have more candidates to start traversing from. if not, then the
// graph is cyclic, which is not permitted
if (startNodePos >= sorted.size()) {
throw new InvalidProgramException("The job graph is cyclic.");
}
JobVertex current = sorted.get(startNodePos++);
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
for (JobEdge edge : dataSet.getConsumers()) {
// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
boolean hasNewPredecessors = false;
for (JobEdge e : v.getInputs()) {
// skip the edge through which we came
if (e == edge) {
continue;
}
IntermediateDataSet source = e.getSource();
if (remaining.contains(source.getProducer())) {
hasNewPredecessors = true;
break;
}
}
if (!hasNewPredecessors) {
target.add(v);
remaining.remove(v);
addNodesThatHaveNoNewPredecessors(v, target, remaining);
}
}
}
}
// --------------------------------------------------------------------------------------------
// Handling of attached JAR files
// --------------------------------------------------------------------------------------------
/**
* Adds the path of a JAR file required to run the job on a task manager.
*
* @param jar
* path of the JAR file required to run the job on a task manager
*/
public void addJar(Path jar) {
if (jar == null) {
throw new IllegalArgumentException();
}
if (!userJars.contains(jar)) {
userJars.add(jar);
}
}
/**
* Gets the list of assigned user jar paths.
*
* @return The list of assigned user jar paths
*/
public List<Path> getUserJars() {
return userJars;
}
/**
* Adds the path of a custom file required to run the job on a task manager.
*
* @param name a name under which this artifact will be accessible through {@link DistributedCache}
* @param file path of a custom file required to run the job on a task manager
*/
public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) {
if (file == null) {
throw new IllegalArgumentException();
}
userArtifacts.putIfAbsent(name, file);
}
/**
* Gets the list of assigned user jar paths.
*
* @return The list of assigned user jar paths
*/
public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() {
return userArtifacts;
}
/**
* Adds the BLOB referenced by the key to the JobGraph's dependencies.
*
* @param key
* path of the JAR file required to run the job on a task manager
*/
public void addUserJarBlobKey(PermanentBlobKey key) {
if (key == null) {
throw new IllegalArgumentException();
}
if (!userJarBlobKeys.contains(key)) {
userJarBlobKeys.add(key);
}
}
/**
* Checks whether the JobGraph has user code JAR files attached.
*
* @return True, if the JobGraph has user code JAR files attached, false otherwise.
*/
public boolean hasUsercodeJarFiles() {
return this.userJars.size() > 0;
}
/**
* Returns a set of BLOB keys referring to the JAR files required to run this job.
*
* @return set of BLOB keys referring to the JAR files required to run this job
*/
public List<PermanentBlobKey> getUserJarBlobKeys() {
return this.userJarBlobKeys;
}
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) throws IOException {
byte[] serializedBlobKey;
serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry(
originalEntry.filePath,
originalEntry.isExecutable,
serializedBlobKey,
originalEntry.isZipped
));
}
public void writeUserArtifactEntriesToConfiguration() {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
DistributedCache.writeFileInfoToConfig(
userArtifact.getKey(),
userArtifact.getValue(),
jobConfiguration
);
}
}
}
作业图是是以DAG(有向无环图)来组织的,是通过顶点和中间态的连接形成的,来看一下这个类的定义中Map<JobVertexID, JobVertex>类型的映射变量,ScheduleMode调度模式和检查点(快照),而在函数addNodesThatHaveNoNewPredecessors中有JobEdge(做为JobVertex的数据输入通道,也就是图的连接边),那看一下JobVertex的定义:
/**
* The base class for job vertexes.
*/
public class JobVertex implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_NAME = "(unnamed vertex)";
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
/** The ID of the vertex. */
private final JobVertexID id;
/** The alternative IDs of the vertex. */
private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
/** The IDs of all operators contained in this vertex. */
private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
/** The alternative IDs of all operators contained in this vertex. */
private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();
/** List of produced data sets, one per writer */
private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
/** List of edges with incoming data. One per Reader. */
private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
/** Number of subtasks to split this task into at runtime.*/
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
/** Maximum number of subtasks to split this task into a runtime. */
private int maxParallelism = -1;
/** The minimum resource of the vertex */
private ResourceSpec minResources = ResourceSpec.DEFAULT;
/** The preferred resource of the vertex */
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;
/** The class of the invokable. */
private String invokableClassName;
/** Indicates of this job vertex is stoppable or not. */
private boolean isStoppable = false;
/** Optionally, a source of input splits */
private InputSplitSource<?> inputSplitSource;
/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
private String name;
/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
private SlotSharingGroup slotSharingGroup;
/** The group inside which the vertex subtasks share slots */
private CoLocationGroup coLocationGroup;
/** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan */
private String operatorName;
/** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
* to be included in the JSON plan */
private String operatorDescription;
/** Optional, pretty name of the operator, to be displayed in the JSON plan */
private String operatorPrettyName;
/** Optional, the JSON for the optimizer properties of the operator result,
* to be included in the JSON plan */
private String resultOptimizerProperties;
/** The input dependency constraint to schedule this vertex. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
// --------------------------------------------------------------------------------------------
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name The name of the new job vertex.
*/
public JobVertex(String name) {
this(name, null);
}
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name The name of the new job vertex.
* @param id The id of the job vertex.
*/
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
// the id lists must have the same size
this.operatorIDs.add(OperatorID.fromJobVertexID(this.id));
this.operatorIdsAlternatives.add(null);
}
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name The name of the new job vertex.
* @param primaryId The id of the job vertex.
* @param alternativeIds The alternative ids of the job vertex.
* @param operatorIds The ids of all operators contained in this job vertex.
* @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
*/
public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size());
this.name = name == null ? DEFAULT_NAME : name;
this.id = primaryId == null ? new JobVertexID() : primaryId;
this.idAlternatives.addAll(alternativeIds);
this.operatorIDs.addAll(operatorIds);
this.operatorIdsAlternatives.addAll(alternativeOperatorIds);
}
......
/**
* Returns the number of inputs.
*
* @return The number of inputs.
*/
public int getNumberOfInputs() {
return this.inputs.size();
}
public List<OperatorID> getOperatorIDs() {
return operatorIDs;
}
public List<OperatorID> getUserDefinedOperatorIDs() {
return operatorIdsAlternatives;
}
/**
* Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
*
* @return the vertex's configuration object
*/
public Configuration getConfiguration() {
if (this.configuration == null) {
this.configuration = new Configuration();
}
return this.configuration;
}
public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
Preconditions.checkNotNull(invokable);
this.invokableClassName = invokable.getName();
this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);
}
/**
* Returns the name of the invokable class which represents the task of this vertex.
*
* @return The name of the invokable class, <code>null</code> if not set.
*/
public String getInvokableClassName() {
return this.invokableClassName;
}
/**
* Returns the invokable class which represents the task of this vertex
*
* @param cl The classloader used to resolve user-defined classes
* @return The invokable class, <code>null</code> if it is not set
*/
public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
if (cl == null) {
throw new NullPointerException("The classloader must not be null.");
}
if (invokableClassName == null) {
return null;
}
try {
return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("The user-code class could not be resolved.", e);
}
catch (ClassCastException e) {
throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
}
}
/**
* Gets the parallelism of the task.
*
* @return The parallelism of the task.
*/
public int getParallelism() {
return parallelism;
}
/**
* Sets the parallelism for the task.
*
* @param parallelism The parallelism for the task.
*/
public void setParallelism(int parallelism) {
if (parallelism < 1) {
throw new IllegalArgumentException("The parallelism must be at least one.");
}
this.parallelism = parallelism;
}
......
/**
* Sets the minimum and preferred resources for the task.
*
* @param minResources The minimum resource for the task.
* @param preferredResources The preferred resource for the task.
*/
public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
this.minResources = checkNotNull(minResources);
this.preferredResources = checkNotNull(preferredResources);
}
public InputSplitSource<?> getInputSplitSource() {
return inputSplitSource;
}
public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
this.inputSplitSource = inputSplitSource;
}
public List<IntermediateDataSet> getProducedDataSets() {
return this.results;
}
public List<JobEdge> getInputs() {
return this.inputs;
}
/**
* Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
* slot sharing group can run one subtask each in the same slot.
*
* @param grp The slot sharing group to associate the vertex with.
*/
public void setSlotSharingGroup(SlotSharingGroup grp) {
if (this.slotSharingGroup != null) {
this.slotSharingGroup.removeVertexFromGroup(id);
}
this.slotSharingGroup = grp;
if (grp != null) {
grp.addVertexToGroup(id);
}
}
/**
* Gets the slot sharing group that this vertex is associated with. Different vertices in the same
* slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
* a slot sharing group, this method returns {@code null}.
*
* @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
*/
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
......
public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
}
CoLocationGroup thisGroup = this.coLocationGroup;
CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
if (otherGroup == null) {
if (thisGroup == null) {
CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
this.coLocationGroup = group;
strictlyCoLocatedWith.coLocationGroup = group;
}
else {
thisGroup.addVertex(strictlyCoLocatedWith);
strictlyCoLocatedWith.coLocationGroup = thisGroup;
}
}
else {
if (thisGroup == null) {
otherGroup.addVertex(this);
this.coLocationGroup = otherGroup;
}
else {
// both had yet distinct groups, we need to merge them
thisGroup.mergeInto(otherGroup);
}
}
}
public CoLocationGroup getCoLocationGroup() {
return coLocationGroup;
}
public void updateCoLocationGroup(CoLocationGroup group) {
this.coLocationGroup = group;
}
// --------------------------------------------------------------------------------------------
public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
}
public IntermediateDataSet createAndAddResultDataSet(
IntermediateDataSetID id,
ResultPartitionType partitionType) {
IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
this.results.add(result);
return result;
}
public JobEdge connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) {
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
JobEdge edge = new JobEdge(dataSetId, this, distPattern);
this.inputs.add(edge);
}
// --------------------------------------------------------------------------------------------
//能过下面两个函数,可以看出此数据结构的输入和输出分别为JobEdge和IntermediateDataSet
public boolean isInputVertex() {
return this.inputs.isEmpty();
}
public boolean isOutputVertex() {
return this.results.isEmpty();
}
public boolean hasNoConnectedInputs() {
for (JobEdge edge : inputs) {
if (!edge.isIdReference()) {
return false;
}
}
return true;
}
......
}
通过分析代码可以看出,这个图的节点的输入输出分别为:JobEdge和IntermediateDataSet两个数据结构。内部还有OperatorID的数据结构,用来管理对数据流的操作形态。涉及到Slot的SlotSharingGroup和CoLocationGroup。再看一下JobEdge和IntermediateDataSet两个数据结构的定义:
public class JobEdge implements java.io.Serializable {
private static final long serialVersionUID = 1L;
/** The vertex connected to this edge. */
private final JobVertex target;
/** The distribution pattern that should be used for this job edge. */
private final DistributionPattern distributionPattern;
/** The data set at the source of the edge, may be null if the edge is not yet connected*/
private IntermediateDataSet source;
/** The id of the source intermediate data set */
private IntermediateDataSetID sourceId;
/** Optional name for the data shipping strategy (forward, partition hash, rebalance, ...),
* to be displayed in the JSON plan */
private String shipStrategyName;
/** Optional name for the pre-processing operation (sort, combining sort, ...),
* to be displayed in the JSON plan */
private String preProcessingOperationName;
/** Optional description of the caching inside an operator, to be displayed in the JSON plan */
private String operatorLevelCachingDescription;
/**
* Constructs a new job edge, that connects an intermediate result to a consumer task.
*
* @param source The data set that is at the source of this edge.
* @param target The operation that is at the target of this edge.
* @param distributionPattern The pattern that defines how the connection behaves in parallel.
*/
public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern) {
if (source == null || target == null || distributionPattern == null) {
throw new NullPointerException();
}
this.target = target;
this.distributionPattern = distributionPattern;
this.source = source;
this.sourceId = source.getId();
}
/**
* Constructs a new job edge that refers to an intermediate result via the Id, rather than directly through
* the intermediate data set structure.
*
* @param sourceId The id of the data set that is at the source of this edge.
* @param target The operation that is at the target of this edge.
* @param distributionPattern The pattern that defines how the connection behaves in parallel.
*/
public JobEdge(IntermediateDataSetID sourceId, JobVertex target, DistributionPattern distributionPattern) {
if (sourceId == null || target == null || distributionPattern == null) {
throw new NullPointerException();
}
this.target = target;
this.distributionPattern = distributionPattern;
this.sourceId = sourceId;
}
/**
* Returns the data set at the source of the edge. May be null, if the edge refers to the source via an ID
* and has not been connected.
*
* @return The data set at the source of the edge
*/
public IntermediateDataSet getSource() {
return source;
}
/**
* Returns the vertex connected to this edge.
*
* @return The vertex connected to this edge.
*/
public JobVertex getTarget() {
return target;
}
/**
* Returns the distribution pattern used for this edge.
*
* @return The distribution pattern used for this edge.
*/
public DistributionPattern getDistributionPattern(){
return this.distributionPattern;
}
/**
* Gets the ID of the consumed data set.
*
* @return The ID of the consumed data set.
*/
public IntermediateDataSetID getSourceId() {
return sourceId;
}
public boolean isIdReference() {
return this.source == null;
}
// --------------------------------------------------------------------------------------------
public void connecDataSet(IntermediateDataSet dataSet) {
if (dataSet == null) {
throw new NullPointerException();
}
if (this.source != null) {
throw new IllegalStateException("The edge is already connected.");
}
if (!dataSet.getId().equals(sourceId)) {
throw new IllegalArgumentException("The data set to connect does not match the sourceId.");
}
this.source = dataSet;
}
......
}
public class IntermediateDataSet implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private final IntermediateDataSetID id; // the identifier
private final JobVertex producer; // the operation that produced this data set
private final List<JobEdge> consumers = new ArrayList<JobEdge>();
// The type of partition to use at runtime
private final ResultPartitionType resultType;
// --------------------------------------------------------------------------------------------
public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {
this.id = checkNotNull(id);
this.producer = checkNotNull(producer);
this.resultType = checkNotNull(resultType);
}
// --------------------------------------------------------------------------------------------
public IntermediateDataSetID getId() {
return id;
}
public JobVertex getProducer() {
return producer;
}
public List<JobEdge> getConsumers() {
return this.consumers;
}
public ResultPartitionType getResultType() {
return resultType;
}
// --------------------------------------------------------------------------------------------
public void addConsumer(JobEdge edge) {
this.consumers.add(edge);
}
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return "Intermediate Data Set (" + id + ")";
}
}
是不是觉得这几个类都是互相勾连的,而且它们都继承了Serializable这个串行化的类,说明它们都是对数据进行操作的。不然继承这个玩意儿有啥意义。
二、分发器
在Standalone的模式下,看一下,分发器的创建:
创建的顺序:StandaloneSessionClusterEntrypoint->SessionDispatcherResourceManagerComponentFactory->createDispatcherResourceManagerComponent->createDispatcher(StandaloneDispatcher)
public class StandaloneDispatcher extends Dispatcher {
public StandaloneDispatcher(
RpcService rpcService,
String endpointId,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception {
super(
rpcService,
endpointId,
configuration,
highAvailabilityServices,
highAvailabilityServices.getSubmittedJobGraphStore(),//注意这个函数
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
historyServerArchivist);
}
}
getSubmittedJobGraphStore可以用来拿到实现JobGraph的实例的句柄,看一下这个接口的定义:
public interface SubmittedJobGraphStore {
/**
* Starts the {@link SubmittedJobGraphStore} service.
*/
void start(SubmittedJobGraphListener jobGraphListener) throws Exception;
/**
* Stops the {@link SubmittedJobGraphStore} service.
*/
void stop() throws Exception;
/**
* Returns the {@link SubmittedJobGraph} with the given {@link JobID} or
* {@code null} if no job was registered.
*/
@Nullable
SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;
/**
* Adds the {@link SubmittedJobGraph} instance.
*
* <p>If a job graph with the same {@link JobID} exists, it is replaced.
*/
void putJobGraph(SubmittedJobGraph jobGraph) throws Exception;
/**
* Removes the {@link SubmittedJobGraph} with the given {@link JobID} if it exists.
*/
void removeJobGraph(JobID jobId) throws Exception;
/**
* Releases the locks on the specified {@link JobGraph}.
*
* Releasing the locks allows that another instance can delete the job from
* the {@link SubmittedJobGraphStore}.
*
* @param jobId specifying the job to release the locks for
* @throws Exception if the locks cannot be released
*/
void releaseJobGraph(JobID jobId) throws Exception;
/**
* Get all job ids of submitted job graphs to the submitted job graph store.
*
* @return Collection of submitted job ids
* @throws Exception if the operation fails
*/
Collection<JobID> getJobIds() throws Exception;
/**
* A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
* multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
*/
interface SubmittedJobGraphListener {
/**
* Callback for {@link SubmittedJobGraph} instances added by a different {@link
* SubmittedJobGraphStore} instance.
*
* <p><strong>Important:</strong> It is possible to get false positives and be notified
* about a job graph, which was added by this instance.
*
* @param jobId The {@link JobID} of the added job graph
*/
void onAddedJobGraph(JobID jobId);
/**
* Callback for {@link SubmittedJobGraph} instances removed by a different {@link
* SubmittedJobGraphStore} instance.
*
* @param jobId The {@link JobID} of the removed job graph
*/
void onRemovedJobGraph(JobID jobId);
}
}
也就是在前面分析HA服务创建的时候儿,会创建这些个服务,这些服务中会有一个启动对JobGraph的管理的任务,通过它拿到作业图的控制权。
三、分发工作图
那么工作图是如何分发的呢?这里可以看看所有的Dispatcher的抽象类的定义:
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
public static final String DISPATCHER_NAME = "dispatcher";
private final Configuration configuration;
private final SubmittedJobGraphStore submittedJobGraphStore;
private final RunningJobsRegistry runningJobsRegistry;
private final HighAvailabilityServices highAvailabilityServices;
private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
private final JobManagerSharedServices jobManagerSharedServices;
private final HeartbeatServices heartbeatServices;
private final BlobServer blobServer;
private final FatalErrorHandler fatalErrorHandler;
private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
private final LeaderElectionService leaderElectionService;
private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
private final JobManagerRunnerFactory jobManagerRunnerFactory;
private final JobManagerMetricGroup jobManagerMetricGroup;
private final HistoryServerArchivist historyServerArchivist;
@Nullable
private final String metricQueryServicePath;
private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
private CompletableFuture<Void> recoveryOperation = CompletableFuture.completedFuture(null);
public Dispatcher(
RpcService rpcService,
String endpointId,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
SubmittedJobGraphStore submittedJobGraphStore,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricServiceQueryPath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception {
super(rpcService, endpointId);
this.configuration = Preconditions.checkNotNull(configuration);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
this.blobServer = Preconditions.checkNotNull(blobServer);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore);
this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup);
this.metricQueryServicePath = metricServiceQueryPath;
this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(
configuration,
this.blobServer);
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
jobManagerRunnerFutures = new HashMap<>(16);
leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist);
this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
this.jobManagerTerminationFutures = new HashMap<>(2);
}
//------------------------------------------------------
// Lifecycle methods
//------------------------------------------------------
@Override
public void onStart() throws Exception {
try {
startDispatcherServices();
} catch (Exception e) {
final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
}
private void startDispatcherServices() throws Exception {
try {
submittedJobGraphStore.start(this);
leaderElectionService.start(this);
registerDispatcherMetrics(jobManagerMetricGroup);
} catch (Exception e) {
handleStartDispatcherServicesException(e);
}
}
private void handleStartDispatcherServicesException(Exception e) throws Exception {
try {
stopDispatcherServices();
} catch (Exception exception) {
e.addSuppressed(exception);
}
throw e;
}
@Override
public CompletableFuture<Void> onStop() {
log.info("Stopping dispatcher {}.", getAddress());
final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
return FutureUtils.runAfterwards(
allJobManagerRunnersTerminationFuture,
() -> {
stopDispatcherServices();
log.info("Stopped dispatcher {}.", getAddress());
});
}
private void stopDispatcherServices() throws Exception {
Exception exception = null;
try {
jobManagerSharedServices.shutdown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
try {
submittedJobGraphStore.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
try {
leaderElectionService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
jobManagerMetricGroup.close();
ExceptionUtils.tryRethrowException(exception);
}
//------------------------------------------------------
// RPCs
//------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try {
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
} else {
return internalSubmitJob(jobGraph);
}
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}
/**
* Checks whether the given job has already been submitted or executed.
*
* @param jobId identifying the submitted job
* @return true if the job has already been submitted (is running) or has been executed
* @throws FlinkException if the job scheduling status cannot be retrieved
*/
private boolean isDuplicateJob(JobID jobId) throws FlinkException {
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
try {
jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
} catch (IOException e) {
throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e);
}
return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId);
}
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
} else {
return acknowledge;
}
}, getRpcService().getExecutor());
}
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph));
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
if (throwable != null) {
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
}
}));
}
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
return jobManagerRunnerFuture
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
rpcService.getExecutor());
return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
}
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
if (strippedThrowable instanceof JobNotFinishedException) {
jobNotFinished(jobId);
} else {
jobMasterFailed(jobId, strippedThrowable);
}
}
} else {
log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
}
}, getMainThreadExecutor());
jobManagerRunner.start();
return jobManagerRunner;
}
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
}
@Override
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return CompletableFuture.supplyAsync(
() -> {
log.info("Disposing savepoint {}.", savepointPath);
try {
Checkpoints.disposeSavepoint(savepointPath, configuration, classLoader, log);
} catch (IOException | FlinkException e) {
throw new CompletionException(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), e));
}
return Acknowledge.get();
},
jobManagerSharedServices.getScheduledExecutorService());
}
@Override
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout));
}
@Override
public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.stop(timeout));
}
@Override
public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) ->
jobMasterGateway.rescaleJob(newParallelism, rescalingBehaviour, timeout));
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
return new ClusterOverview(resourceOverview, allJobsOverview);
});
}
@Override
public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
List<CompletableFuture<Optional<JobDetails>>> individualOptionalJobDetails = queryJobMastersForInformation(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobDetails(timeout));
CompletableFuture<Collection<Optional<JobDetails>>> optionalCombinedJobDetails = FutureUtils.combineAll(
individualOptionalJobDetails);
CompletableFuture<Collection<JobDetails>> combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
final Collection<JobDetails> completedJobDetails = archivedExecutionGraphStore.getAvailableJobDetails();
return combinedJobDetails.thenApply(
(Collection<JobDetails> runningJobDetails) -> {
final Collection<JobDetails> allJobDetails = new ArrayList<>(completedJobDetails.size() + runningJobDetails.size());
allJobDetails.addAll(runningJobDetails);
allJobDetails.addAll(completedJobDetails);
return new MultipleJobsDetails(allJobDetails);
});
}
@Override
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
return jobStatusFuture.exceptionally(
(Throwable throwable) -> {
final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
// check whether it is a completed job
if (jobDetails == null) {
throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
} else {
return jobDetails.getStatus();
}
});
}
@Override
public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
final JobID jobId,
final JobVertexID jobVertexId) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestOperatorBackPressureStats(jobVertexId));
}
@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));
return archivedExecutionGraphFuture.exceptionally(
(Throwable throwable) -> {
final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
// check whether it is a completed job
if (serializableExecutionGraph == null) {
throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
} else {
return serializableExecutionGraph;
}
});
}
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
if (jobManagerRunnerFuture == null) {
final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
if (archivedExecutionGraph == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
}
} else {
return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
}
}
@Override
public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
if (metricQueryServicePath != null) {
return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
} else {
return CompletableFuture.completedFuture(Collections.emptyList());
}
}
@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
return runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout));
}
@Override
public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
return CompletableFuture.completedFuture(blobServer.getPort());
}
@Override
public CompletableFuture<String> triggerSavepoint(
final JobID jobId,
final String targetDirectory,
final boolean cancelJob,
final Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) ->
jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
}
@Override
public CompletableFuture<Acknowledge> shutDownCluster() {
closeAsync();
return CompletableFuture.completedFuture(Acknowledge.get());
}
/**
* Cleans up the job related data from the dispatcher. If cleanupHA is true, then
* the data will also be removed from HA.
*
* @param jobId JobID identifying the job to clean up
* @param cleanupHA True iff HA data shall also be cleaned up
*/
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);
registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
}
private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
// clean up the pending termination future
jobManagerRunnerTerminationFuture.thenRunAsync(
() -> {
final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);
//noinspection ObjectEquality
if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
jobManagerTerminationFutures.put(jobId, terminationFuture);
}
},
getUnfencedMainThreadExecutor());
}
private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
if (jobManagerRunnerFuture != null) {
jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
} else {
jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
}
return jobManagerRunnerTerminationFuture.thenRunAsync(
() -> cleanUpJobData(jobId, cleanupHA),
getRpcService().getExecutor());
}
private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
jobManagerMetricGroup.removeJob(jobId);
boolean cleanupHABlobs = false;
if (cleanupHA) {
try {
submittedJobGraphStore.removeJobGraph(jobId);
// only clean up the HA blobs if we could remove the job from HA storage
cleanupHABlobs = true;
} catch (Exception e) {
log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
}
try {
runningJobsRegistry.clearJob(jobId);
} catch (IOException e) {
log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
}
} else {
try {
submittedJobGraphStore.releaseJobGraph(jobId);
} catch (Exception e) {
log.warn("Could not properly release job {} from submitted job graph store.", jobId, e);
}
}
blobServer.cleanupJob(jobId, cleanupHABlobs);
}
/**
* Terminate all currently running {@link JobManagerRunner}.
*/
private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());
for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
}
}
private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
terminateJobManagerRunners();
final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
return FutureUtils.completeAll(values);
}
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
@VisibleForTesting
Collection<JobGraph> recoverJobs() throws Exception {
log.info("Recovering all persisted jobs.");
final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
try {
return recoverJobGraphs(jobIds);
} catch (Exception e) {
// release all recovered job graphs
for (JobID jobId : jobIds) {
try {
submittedJobGraphStore.releaseJobGraph(jobId);
} catch (Exception ie) {
e.addSuppressed(ie);
}
}
throw e;
}
}
@Nonnull
private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
for (JobID jobId : jobIds) {
final JobGraph jobGraph = recoverJob(jobId);
if (jobGraph == null) {
throw new FlinkJobNotFoundException(jobId);
}
jobGraphs.add(jobGraph);
}
return jobGraphs;
}
@Nullable
private JobGraph recoverJob(JobID jobId) throws Exception {
log.debug("Recover job {}.", jobId);
final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
if (submittedJobGraph != null) {
return submittedJobGraph.getJobGraph();
} else {
return null;
}
}
protected void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
Preconditions.checkArgument(
archivedExecutionGraph.getState().isGloballyTerminalState(),
"Job %s is in state %s which is not globally terminal.",
archivedExecutionGraph.getJobID(),
archivedExecutionGraph.getState());
log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
archiveExecutionGraph(archivedExecutionGraph);
final JobID jobId = archivedExecutionGraph.getJobID();
removeJobAndRegisterTerminationFuture(jobId, true);
}
private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
try {
archivedExecutionGraphStore.put(archivedExecutionGraph);
} catch (IOException e) {
log.info(
"Could not store completed job {}({}).",
archivedExecutionGraph.getJobName(),
archivedExecutionGraph.getJobID(),
e);
}
final CompletableFuture<Acknowledge> executionGraphFuture = historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
executionGraphFuture.whenComplete(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
log.info(
"Could not archive completed job {}({}) to the history server.",
archivedExecutionGraph.getJobName(),
archivedExecutionGraph.getJobID(),
throwable);
}
});
}
protected void jobNotFinished(JobID jobId) {
log.info("Job {} was not finished by JobManager.", jobId);
removeJobAndRegisterTerminationFuture(jobId, false);
}
private void jobMasterFailed(JobID jobId, Throwable cause) {
// we fail fatally in case of a JobMaster failure in order to restart the
// dispatcher to recover the jobs again. This only works in HA mode, though
onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
}
private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
if (jobManagerRunnerFuture == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
return leaderGatewayFuture.thenApplyAsync(
(JobMasterGateway jobMasterGateway) -> {
// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
if (jobManagerRunnerFutures.containsKey(jobId)) {
return jobMasterGateway;
} else {
throw new CompletionException(new FlinkJobNotFoundException(jobId));
}
},
getMainThreadExecutor());
}
}
private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
return resourceManagerGatewayRetriever.getFuture();
}
private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
return getResourceManagerGateway().thenApply(resourceManagerCommand).thenCompose(Function.identity());
}
private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCollection) {
return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
}
@Nonnull
private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
final int numberJobsRunning = jobManagerRunnerFutures.size();
ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
numberJobsRunning);
for (JobID jobId : jobManagerRunnerFutures.keySet()) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
.thenCompose(queryFunction::apply)
.handle((T value, Throwable throwable) -> Optional.ofNullable(value));
optionalJobInformation.add(optionalRequest);
}
return optionalJobInformation;
}
//------------------------------------------------------
// Leader contender
//------------------------------------------------------
/**
* Callback method when current resourceManager is granted leadership.
*
* @param newLeaderSessionID unique leadershipID
*/
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
runAsyncWithoutFencing(
() -> {
log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync(
FunctionUtils.uncheckedFunction(ignored -> recoverJobs()),
getRpcService().getExecutor());
final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
getUnfencedMainThreadExecutor());
final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
recoveredJobsFuture,
BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
if (confirmLeadership) {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
} else {
for (JobGraph recoveredJob : recoveredJobs) {
submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
}
}
return null;
}),
getRpcService().getExecutor());
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(
new DispatcherException(
String.format("Failed to take leadership with session id %s.", newLeaderSessionID),
(ExceptionUtils.stripCompletionException(throwable))));
}
});
recoveryOperation = confirmationFuture;
});
}
private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);
Collection<CompletableFuture<?>> runFutures = new ArrayList<>(recoveredJobs.size());
for (JobGraph recoveredJob : recoveredJobs) {
final CompletableFuture<?> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
runFutures.add(runFuture);
}
return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
} else {
log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId);
return CompletableFuture.completedFuture(false);
}
}
private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
new DispatcherException(
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
throwable)); });
return jobManagerTerminationFuture.thenComposeAsync(
FunctionUtils.uncheckedFunction((ignored) -> {
jobManagerTerminationFutures.remove(jobId);
return action.apply(jobGraph);
}),
getMainThreadExecutor());
}
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
if (jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
}
}
@VisibleForTesting
CompletableFuture<Void> getRecoveryOperation() {
return recoveryOperation;
}
private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
// clear the state if we've been the leader before
if (getFencingToken() != null) {
clearDispatcherState();
}
setFencingToken(dispatcherId);
}
private void clearDispatcherState() {
terminateJobManagerRunners();
}
private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS,
() -> (long) jobManagerRunnerFutures.size());
}
/**
* Callback method when current resourceManager loses leadership.
*/
@Override
public void revokeLeadership() {
runAsyncWithoutFencing(
() -> {
log.info("Dispatcher {} was revoked leadership.", getAddress());
setNewFencingToken(null);
});
}
/**
* Handles error occurring in the leader election service.
*
* @param exception Exception being thrown in the leader election service
*/
@Override
public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception));
}
//------------------------------------------------------
// SubmittedJobGraphListener
//------------------------------------------------------
@Override
public void onAddedJobGraph(final JobID jobId) {
runAsync(
() -> {
if (!jobManagerRunnerFutures.containsKey(jobId)) {
// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
// the specified job is already removed from the SubmittedJobGraphStore. In this case,
// SubmittedJobGraphStore.recoverJob returns null.
final CompletableFuture<Optional<JobGraph>> recoveredJob = recoveryOperation.thenApplyAsync(
FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(recoverJob(jobId))),
getRpcService().getExecutor());
final DispatcherId dispatcherId = getFencingToken();
final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
if (!isRecoveredJobRunning) {
submittedJobGraphStore.releaseJobGraph(jobId);
}
}),
getRpcService().getExecutor())))
.orElse(CompletableFuture.completedFuture(null)),
getUnfencedMainThreadExecutor());
submissionFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(
new DispatcherException(
String.format("Could not start the added job %s", jobId),
ExceptionUtils.stripCompletionException(throwable)));
}
});
recoveryOperation = submissionFuture;
}
});
}
private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
final JobID jobId = jobGraph.getJobID();
if (jobManagerRunnerFutures.containsKey(jobId)) {
// we must not release the job graph lock since it can only be locked once and
// is currently being executed. Once we support multiple locks, we must release
// the JobGraph here
log.debug("Ignore added JobGraph because the job {} is already running.", jobId);
return CompletableFuture.completedFuture(true);
} else if (runningJobsRegistry.getJobSchedulingStatus(jobId) != RunningJobsRegistry.JobSchedulingStatus.DONE) {
return waitForTerminatingJobManager(jobId, jobGraph, this::runJob).thenApply(ignored -> true);
} else {
log.debug("Ignore added JobGraph because the job {} has already been completed.", jobId);
}
}
return CompletableFuture.completedFuture(false);
}
@Override
public void onRemovedJobGraph(final JobID jobId) {
runAsync(() -> {
try {
removeJobAndRegisterTerminationFuture(jobId, false);
} catch (final Exception e) {
onFatalError(new DispatcherException(String.format("Could not remove job %s.", jobId), e));
}
});
}
}
可以看到其中有图的Add,Remove等,这就是图进行分发的各种动作。在以后会进一步分析其内部的调用流程。
四、总结
通过上述的分析,基本明白了flink中对一个工作图的分发和得到的过程,具体的生成过程,会在StreamGraph转到JobGraph过程中进行分析,这里不详细分析展开。