目录
Flink如何判断算子能组成一个operate chain?
本文首先总结在什么情况下算子能组成一个operate chain,并根据wordcount代码一步步进入真正判断isChainable进行源码解析(Flink 1.15.2版本)
Flink如何判断算子能组成一个operate chain?
- 当满足以下情况时
- 下游算子的入边必须是1,下游算子不能是connect、union,join
- 上游算子和下游算子都在同一个SlotSharingGroup
- 下游算子为不为null 或者上游算子不为为null
- downStreamOperator 不属于YieldingOperatorFactory类并且Source类型不是LegacySource
- upStreamOperator.ChainingStrategy为 ALWAYS、HEAD、HEAD_WITH_SOURCES类型
- downStreamOperator.ChainStrategy为ALWAYS、HEAD_WITH_SOURCES(只有upstream 是source)
- outputPartitioner 为ForwardPartitioner,也就是算子下游的分区为ForwardParttioner
- StreamEdge的ExchangeMode不为BATCH模式
- streamGraph.isChainingEnabled为true ,job没有调用disableChaining()
在这里我用wordcount代码进行讲解分析
WordCount.java
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=
StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost",9999)
.flatMap(new FlatMapFunction<String,Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words=value.split(",");
for (String word:words) {
out.collect(new Tuple2<>(word,1));
}
}
}).keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1).print();
env.execute();
}
}
在StreamingJobGenerator.java中的createChain()方法中进行isChainable判断
进入到isChainable(outEdge,streamGraph)方法中
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
以wordcount代码为例,首先进入的edge是source->flatmap
downStreamVertex: 为下游StreamNode也就是flatmap,判断flatmap的入边是否为1,满足条件后进入isChanableInput(edge,streamGraph)
areOperatorChainable()方法主要是做了ChainingStrategy判断
在 source->flatmap过程中 outputPartitioner=REBALANCE不是ForwardPartitioner类所以,source->flatmap不是一个operatorChain
flatmap->window分区为 KeyGroupStreamPartitioner也不是ForwardPartitioner,也不能组成operatorChain
window->sink 分区为FORWARD,可以称为operatorChain
当然也可以在算子层面添加不同的slot共享组、设置disableChaining来进行测试。
总结:
当算子上游不是union、connect、join并且算子分区为Forwardpartitioner方式,并且上下游算子在同一个slot共享组。没有设置disableChaining的情况下,会被组成一个operator chain