StreamGraph 构建和提交源码解析
StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。Flink 把每一个算子 transform
成一个对流的转换(比如 SingleOutputStreamOperator, 它就是一个 DataStream 的子类),并且
注册到执行环境中,用于生成 StreamGraph
它包含的主要抽象概念有
1、StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
2、StreamEdge:表示连接两个 StreamNode 的边
1.1 transformation算子封装
用户代码 执行map等算子时,以map为例
DataStream.map -> transform -> doTransform -> getExecutionEnvironment().addOperator(resultTransform)
-> transformations.add(transformation)
将代码中的算子全部封装到transformations 这个list里面,后面生成图时从这个transformations里面遍历生成StramGraph
从上方代码可以了解到, map 转换将用户自定义的函数 MapFunction 包装到 StreamMap
这个 Operator 中,再将 StreamMap 包装到 OneInputTransformation,最后该 transformation 存
到 env 中,当调用 env.execute 时,遍历其中的 transformation 集合构造出 StreamGraph
其分层实现如下图所示
另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一
些只是逻辑概念,比如 union、 split/select、 partition 等。如下图所示的转换树,在运行时
会优化成下方的操作图
union、 split/select(1.12 已移除)、 partition 中的信息会被写入到 Source – > Map 的
边中。通过源码也可以发现 UnionTransformation , SplitTransformation(1.12 移除) ,
SelectTransformation(1.12 移除) ,PartitionTransformation 由于不包含具体的操作所以都没
有 StreamOperator 成员变量,而其他 StreamTransformation 的子类基本上都有。
1.2 StramGraph构建源码分析
生成StramGraph入口:
StreamExecutionEnvironment.execute(getStreamGraph(jobName));
-> getStreamGraph -> StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
-> generate -> StreamGraphGenerator.transform(transformations是一个list,依次存放了 用户代码里的 算子)
-> translate -> SimpleTransformationTranslator.translateForStreaming
-> translateForStreamingInternal
区分 map之类的转换算子(OneInputTransformationTranslator)
keyby值类的分区算子(PartitionTransformationTranslator)
-> OneInputTransformationTranslator.translateForStreamingInternal -> translateInternal
-> streamGraph.addOperator(生成StreamNode入口)
-> addOperator-> addNode -> StreamNode vertex = new StreamNode
-> streamGraph.addEdge(生成StreamEdge入口)
-> streamGraph.addEdgeInternal
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
/*TODO 当上游是侧输出时,递归调用,并传入侧输出信息*/
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
/*TODO 当上游是partition时,递归调用,并传入partitioner信息*/
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
/*TODO 真正构建StreamEdge*/
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
/*TODO 未指定partitioner的话,会为其选择 forward 或 rebalance 分区*/
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
// TODO 健康检查,forward 分区必须要上下游的并发度一致
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
/*TODO 创建 StreamEdge*/
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
partitioner, outputTag, shuffleMode);
/*TODO 将该 StreamEdge 添加到上游的输出,下游的输入*/
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
再来看下对逻辑转换(partition、 union 等)的处理,如下是 transformPartition 函数的源码:
PartitionTransformationTranslator.java
private Collection<Integer> translateInternal(){
for (Integer inputId: context.getStreamNodeIds(input)) {
/*TODO 生成一个新的虚拟id*/
final int virtualId = Transformation.getNewNodeId();
/*TODO 添加一个虚拟分区节点,不会生成StreamNode*/
streamGraph.addVirtualPartitionNode(
inputId,
virtualId,
transformation.getPartitioner(),
transformation.getShuffleMode());
resultIds.add(virtualId);
}
}
对partition 的转换没有生成具体的 StreamNode 和 StreamEdge,而是添加一个虚节点。
当 partition 的下游 transform(如 map)添加 edge 时(调用 StreamGraph.addEdge),会把
partition 信息写入到 edge 中
实例分析
DataStream<String> text = env.socketTextStream(hostName, port);
text.flatMap(new LineSplitter()).shuffle().filter(new HelloFilter()).print();
如上程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区 shuffle。分析该程序是如何生成 StreamGraph 的
首先会在 env 中生成一棵 transformation 树,用 List<Transformation<?>>保存。其结构
图如下
其中符号*为 input 指针,指向上游的 transformation,从而形成了一棵 transformation
树。然后,通过调用 StreamGraphGenerator.generate(env, transformations)来生成
StreamGraph。自底向上递归调用每一个 transformation,也就是说处理顺序是
Source->FlatMap->Shuffle->Filter->Sink
1) 首先处理的 Source,生成了 Source 的 StreamNode。
2) 然后处理的 FlatMap,生成了 FlatMap 的 StreamNode,并生成 StreamEdge 连接上游
Source 和 FlatMap。由于上下游的并发度不一样(1:4),所以此处是 Rebalance 分区。
3) 然后处理的 Shuffle,由于是逻辑转换,并不会生成实际的节点。将 partitioner 信息暂存在 virtuaPartitionNodes 中。
4) 在处理 Filter 时,生成了 Filter 的 StreamNode。发现上游是 shuffle,找到 shuffle 的上游FlatMap,
创建 StreamEdge 与 Filter 相连。并把 ShufflePartitioner 的信息写到 StreamEdge中。
5) 最后处理 Sink,创建 Sink 的 StreamNode,并生成 StreamEdge 与上游 Filter 相连。由于
上下游并发度一样(4:4),所以此处选择 Forward 分区。
最后可以通过 UI 可视化 来观察得到的 StreamGraph