在 Flink 应用程序中,其实所有的操作,都是 StreamOperator,分为 SourceOperator,SinkOperator,
StreamOperator,然后能被优化的 Operator 就会 chain 在一起,形成一个OperatorChain
三个类似的概念:
1、Function
2、Operator
3、Transformation
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/*************************************************
* TODO
* 注释: 解析 host 和 port
*/
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch(Exception e) {
System.err.println(
"No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line");
return;
}
/*************************************************
* TODO
* 注释: 获取 StreamExecutionEnvironment
* 它呢,还是 跟 Spark 中的 SparkContext 还是有区别的!
*/
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*************************************************
* TODO
* 注释: 加载数据源得到数据抽象:DataStream
* 其实最终,只是创建了一个 DataStreamSource 对象,然后把 SourceFunction(StreamOperator)和 StreamExecutionEnvironment
* 设置到了 DataStreamSource 中, DataStreamSource 是 DataStream 的子类
* -
* DataStream 的主要分类:
* DataStreamSource 流数据源
* DataStreamSink 流数据目的地
* KeyedStream 按key分组的数据流
* DataStream 普通数据流
* -
* 关于函数理解:
* Function 传参
* Operator Graph 中抽象概念
* Transformation 一种针对流的逻辑操作
* 最终: Function ---> Operator ---> Transformation
*/
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
// TODO 注释: 讲算子生成 Transformation 加入到 Env 中的 transformations 集合中
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for(String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
// TODO 注释: 依然创建一个 DataStream(KeyedStream)
.keyBy(value -> value.word).timeWindow(Time.seconds(5))
// TODO 注释:
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
/*************************************************
* TODO
* 注释: 提交执行
*/
env.execute("Socket Window WordCount");
}
一、执行代码
1、ExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Flink 应用程序的执行,首先就是创建运行环境 StreamExecutionEnvironment,一般在企业环境中,
都是通过 getExecutionEnvironment() 来获取 ExecutionEnvironment,如果是本地运行的话,则会获
取到:LocalStreamEnvironment,如果是提交到 Flink 集群运行,则获取到:StreamExecutionEnvironment。
StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:
1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源
2、提供了 setParallelism() 设置程序的并行度
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责Job执行的一些行为
配置管理。还管理了 Configuration 管理一些其他的配置
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations成员变量,
该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些Transformation
按照逻辑拼接起来,就能得到 StreamGragh(Transformation ->StreamOperator -> StreamNode)
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph
2、addsource 返回DataStreamSource
env.socketTextStream -> socketTextStream -> addSource(){
/*************************************************
* TODO
* 注释: 构建 SourceOperator
* 它是 SourceFunction 的子类,也是 StreamOperator 的子类
*/
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
/*************************************************
* TODO
* 注释: 返回 DataStreamSource
* 关于这个东西的抽象有四种:
* 1、DataStream
* 2、KeyedDataStream
* 3、DataStreamSource
* 4、DataStreamSink
*/
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
}
3、flatMap
text.flatMap -> DataStream.flatMap(){
/*************************************************
* TODO
* 注释: flink把每一个算子transform成一个对流的转换
* 并且注册到执行环境中,用于生成StreamGraph
* -
* 第一步:用户代码里定义的UDF会被当作其基类对待,然后交给 StreamFlatMap 这个 operator 做进一步包装。
* 事实上,每一个Transformation都对应了一个StreamOperator。
* -
* flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后交给输出流的过程
* -
* StreamFlatMap 是一个 Function 也是一个 StreamOperator
* -
* StreamFlatMap = StreamOperator
* flatMapper = Function
* -最终调用 transform 方法来把 StreamFlatMap 这种StreamOperator 转换成 Transformation
* 最终加入到 StreamExectiionEnvironment 的 List<Transformation<?>> transformations
*/
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
transform -> doTransform
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
/*************************************************
* TODO
* 注释: 构建: OneInputTransformation
* 由于 flatMap 这个操作只接受一个输入,所以再被进一步包装为 OneInputTransformation
*/
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, operatorName, operatorFactory, outTypeInfo,
environment.getParallelism());
/*************************************************
* TODO
* 注释: 构建: SingleOutputStreamOperator
*/
@SuppressWarnings({
"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
/*************************************************
* TODO 重点
* 注释: 把 Operator 注册到执行环境中,用于生成 StreamGraph
* 最后,将该 transformation 注册到执行环境中,当执行 generate 方法时,生成 StreamGraph 图结构。
*/
getExecutionEnvironment().addOperator(resultTransform);
/*************************************************
* TODO
* 注释:
* SingleOutputStreamOperator 也是 DataStream 的子类,也就是返回了一个新的 DataStream
* 然后调用新的 DataStream 的某一个算子,又生成新的 StreamTransformation,
* 继续加入到 StreamExecutionEnvironment 的 transformations
*/
return returnStream;
}
getExecutionEnvironment().addOperator(resultTransform) {
/*************************************************
* TODO
* 注释: 将 Transformation 加入 transformations 集合
*/
this.transformations.add(transformation);
}
4、keyBy(value -> value.word) 创建一个 KeyedStream
5、timeWindow(Time.seconds(5)) WindowedStream
6、reduce 和flatMap类似分析
7、windowCounts.print() addSink 返回 DataStreamSink
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if(sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
// TODO 注释: SinkOperator 会被封装成 StreamSink
// TODO 注释: Function Operator Transformation 可以认为是用于不同地方的同一个概念。
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
/*************************************************
* TODO
* 注释: 构造一个 DataStreamSink 流对象
*/
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
/*************************************************
* TODO
* 注释: 将算子加入到 ExecutionEnvironment 中的 transformations 集合中
*/
getExecutionEnvironment().addOperator(sink.getTransformation());
// TODO 注释: 返回 DataStreamSink
return sink;
}
注意:
source keyby window 未加入transformations集合
keyby 属于 PartitionTransformation 于不包含具体的操作 所以都没有 StreamOperator 成员变量
参考上篇 transformation算子封装
source – LegacySourceTransformation
flatmap,reduce sink 加入 transformations集合
source – LegacySourceTransformation
keyby – PartitionTransformation
flatMap reduce – OneInputTransformation
sink – SinkTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation
-> super
public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
this.id = getNewNodeId();
this.name = Preconditions.checkNotNull(name);
this.outputType = outputType;
this.parallelism = parallelism;
this.slotSharingGroup = null;
}
this.id = getNewNodeId(); -> idCounter++; id 自增
在 StreamGraph addNode时
* 这个 vertexID 就是 Transformation id transform.getId()
* Transformation id 在创建 Transformation 时生成
* 即一个Transformation 对应一个 StreamNode
protected StreamNode addNode(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass, StreamOperatorFactory<?> operatorFactory, String operatorName) {
if(streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
/*************************************************
* TODO
* 注释: 生成一个 StreamNode
*/
StreamNode vertex = new StreamNode(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, new ArrayList<OutputSelector<?>>(),
vertexClass);
/*************************************************
* TODO
* 注释: 添加一个 StreamNode
*
* 这个 vertexID 就是 Transformation id transform.getId()
* Transformation id 在创建 Transformation 时生成
* 即一个Transformation 对应一个 StreamNode
*/
streamNodes.put(vertexID, vertex);
return vertex;
}
8、env.execute(“Socket Window WordCount”); 提交执行
二、 提交执行
客户端 先构建StreamGraph,再构建JobGraph,再提交到服务端(WebMonitorEndpoint)
服务端接收到请求后,先创建jobMaster,再将传过来的JobGraph构建ExecutionGraph
再启动jobMaster,并向ResourceManager注册并维持心跳
再调度task执行
env.execute -> execute(getStreamGraph(jobName));
-> (2.1) getStreamGraph 生产StreamGraph
-> executeAsync -> execute -> AbstractSessionClusterExecutor.execute(){
(2.2) JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
/*************************************************
* TODO ->
* 注释: 提交执行
* 1、MiniClusterClient 本地执行
* 2、RestClusterClient 提交到 Flink Rest 服务接收处理
*/
clusterClient.submitJob(jobGraph)
}
-> RestClusterClient.submitJob -> sendRetriableRequest(){
/*************************************************
* TODO
* 注释: restClient = RestClient
* 注意这儿: 提交 Request 给 WebMonitorEndpoint, 最终由 JobSubmitHandler 来执行请求处理
* 通过 Http Restful 方式提交
*/
restClient.sendRequest()
}
-> RestClient.sendRequest -> submitRequest(){
// 通过 Netty 客户端发送请求给 Netty 服务端
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
/**
* 注释: 发送请求 到 WebMonitorEndpoint 的 Netty 服务端
* 最终由: JobSubmitHandler 来执行处理
*/
httpRequest.writeTo(channel);
}
最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 的 handleRequest 来执行处理
handleRequest -> DispatcherGateway.submitJob -> internalSubmitJob
-> persistAndRunJob -> runJob
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
/*************************************************
* TODO ->
* 注释: 创建 JobManagerRunner
* 在这里面会做一件重要的事情:
* 1、创建 JobMaster 实例
* 2、在创建 JobMaster 的时候,同时会把 JobGraph 编程 ExecutionGraph
* -
* 严格来说,是启动 JobMaster, 那么这个地方的名字,就应该最好叫做: createJobMasterRunner
* Flink 集群的一两个主从架构:
* 1、资源管理: ResourceManager + TaskExecutor
* 2、任务运行: JobMaster + StreamTask
*/
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
/*************************************************
* TODO
* 注释: 启动 JobManagerRunner
*/
return jobManagerRunnerFuture.thenApply(
/*************************************************
* TODO
* 注释: 提交任务 == start JobManagerRunner
*/
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)
).thenApply(
FunctionUtils.nullFn()
).whenCompleteAsync(
(ignored, throwable) -> {
if(throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
}, getMainThreadExecutor());
}
createJobManagerRunner 创建jobMaster 并将JobGraph构建为ExecutionGraph
--> createJobManagerRunner -> createJobManagerRunner -> new JobManagerRunnerImpl -> createJobMasterService
-> new JobMaster(){
this.schedulerNG = createScheduler(jobManagerJobMetricGroup);} -> createInstance
-> new DefaultScheduler -> super -> public SchedulerBase()
(2.3){
this.executionGraph = createAndRestoreExecutionGraph()}
--> JobGraph 转换成 ExecutionGraph 入口
startJobManagerRunner 启动JobMaster
--> startJobManagerRunner -> ... -> startJobMaster -> JobMaster.start() -> startJobExecution(){
/*************************************************
* TODO
* 注释: 初始化一些必要的服务组件
* JobMaster 的注册和心跳
*/
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
/*************************************************
* TODO
* 注释: 开始调度执行
* JobMaster 调度 StreamTask 去运行
*
* TODO 1.slot管理(申请和释放)
* 2.task部署和提交
*/
resetAndStartScheduler();
}
--> startJobMasterServices(){
startHeartbeatServices // 启动心跳服务
reconnectToResourceManager // 向 ResourceManager 注册该 JobMaster
}
/** 注册流程: 以JobMaster向ResourceManager为例
* 1、createNewRegistration -> generateRegistration -> JobMaster.ResourceManagerConnection.generateRegistration
* 2、startRegistration -> register -> invokeRegistration -> JobMaster.ResourceManagerConnection.generateRegistration.invokeRegistration
* 3、onRegistrationSuccess -> onRegistrationSuccess -> JobMaster.ResourceManagerConnection.onRegistrationSuccess
*/
-> reconnectToResourceManager -> ResourceManagerConnection.start(){
createNewRegistration(){
generateRegistration()
onRegistrationSuccess()
}
startRegistration() -> register -> invokeRegistration
}
JobMaster.resetAndStartScheduler 启动调度 JobMaster 调度 StreamTask 去运行
1.slot管理(申请和释放)
2.task部署和提交
-> startScheduling -> DefaultScheduler.startScheduling(); -> startSchedulingInternal
-> EagerSchedulingStrategy.startScheduling -> allocateSlotsAndDeploy
-> schedulerOperations.allocateSlotsAndDeploy
-> DefaultScheduler.allocateSlotsAndDeploy(){
allocateSlots //申请Slot 核心入口
waitForAllSlotsAndDeploy // 部署运行
}
Slot 管理(申请和释放)源码解析
大体上,分为四个大步骤
1、JobMaster 发送请求 到ResourceManager 申请 slot
2、ResourceManager 接收到请求,执行 slot请求处理
3、TaskManager 处理 ResourceManager 发送过来的 Slot 请求
4、JobMaster 接收到 TaskManager 发送过来的 Slot 申请处理结果
对应的一些组件:
1、ResourceManager 管理所有的 TaskManager(TaskExecutor)
2、TaskExecutor 中关于资源的管理,使用 slot的抽象:
slot 的状态管理
专门有一个 做 slot 管理的 SlotManagerImpl
3、JobMaster 申请slot, 管理组件:SlotPool
slot共享! 既然有 slot 共享的概念,如果要执行一个 Task,其实就可以先尝试从 SlotPool 中申请,
如果申请不到,则再向 ResourceManager 申请
1、 JobMaster 发送请求向ResouceManager申请 slot
allocateSlots -> allocateSlotsFor -> NormalSlotProviderStrategy.allocateSlot
-> slotProvider.allocateSlot -> SchedulerImpl.allocateSlot -> allocateSlotInternal
-> allocateSingleSlot -> requestNewAllocatedSlot -> requestNewAllocatedSlot
-> requestNewAllocatedSlotInternal -> requestSlotFromResourceManager
2、ResourceManager 接收到请求,执行 slot请求处理
-> resourceManagerGateway.requestSlot -> slotManager.registerSlotRequest
-> internalRequestSlot
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile)).ifPresent(
/*************************************************
* TODO
* 注释: 向 TaskManager 申请 Slot
*/
taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)
).ifNotPresent(
/*************************************************
* TODO 重点 如果ResourceManager无slot 则 创建新的TaskExecutor,
* 如向YarnResourceManager 申请资源创建Container,启动YarnTaskExecutorRunner
* StandaloneResourceManager 是false ,即Standalon 无slot时(集群资源不够用时)不能启动新的TaskManager 创建TaskExecutor
*
* 注释: 使用待处理的任务管理器插槽满足待处理的插槽请求
*/
() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)
);
}
yarn 流程
fulfillPendingSlotRequestWithPendingTaskManagerSlot
-> allocateResource -> resourceActions.allocateResource -> ResourceManager.ResourceActionsImpl.allocateResource
-> ActiveResourceManager.startNewWorker -> requestNewWorker -> resourceManagerDriver.requestResource(
-> YarnResourceManagerDriver.requestResource
创建和启动好 taskExecutor 并已向ResourceManager注册后走 allocateSlot 向 TaskManager 申请 Slot
taskExecutorgateway.requestSlot
3、TaskManager 处理 ResourceManager 发送过来的 Slot 请求
TaskExecutor.requestSlot(){
allocateSlot -> taskSlot = new TaskSlot
offerSlotsToJobManager -> internalOfferSlotsToJobManager -> JobMasterGateway.offerSlots();
}
4、JobMaster 接收到 TaskManager 发送过来的 Slot 申请处理结果
JobMaster.offerSlots(); -> SlotPoolImpl.offerSlots
-> offerSlot -> tryFulfillSlotRequestOrMakeAvailable
task部署和提交
waitForAllSlotsAndDeploy
2.1 StreamGraph
2.2 JobGraph
AbstractSessionClusterExecutor.execute
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
-> FlinkPipelineTranslationUtil.getJobGraph -> getJobGraph -> StreamingJobGraphGenerator.createJobGraph
-> new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); -> setChaining
-> createChain(){
/*************************************************
* TODO
* 注释: 判断是否可以 chain 在一起!
* 当前这个地方做的事情,只是当前这个 StreamNode 和它的直接下游 StreamNode
*/
for(StreamEdge outEdge : currentNode.getOutEdges()) {
/*************************************************
* TODO 重点 1 isChainable
* 注释: 判断一个 StreamGraph 中的一个 StreamEdge 链接的上下游 Operator(StreamNode) 是否可以 chain 在一起
*
*/
if(isChainable(outEdge, streamGraph)) {
// TODO 注释: 加入可 chain 集合
chainableOutputs.add(outEdge);
} else {
// TODO 注释: 加入不可 chain 集合
nonChainableOutputs.add(outEdge);
}
}
/*************************************************
* TODO 重点 2 createJobVertex
* 注释: 把chain在一起的多个 Operator 创建成一个 JobVertex
* 如果当前节点是 chain 的起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
* createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
*
* --总结:
* StreamGraph -> JobGrahph
* 判断哪些StreamEge可以执行优化(chain),将 多个StreanNode 并成一个 JobVertex
*
* StreamNode_A -> (StreamNode_B -> StreamNode_C)
* B,C chain在一起, startNodeId = B // Integer startNodeId = chainInfo.getStartNodeId();
* 当 currentNodeId = B 则 B 创建 JobVertex
* 当 currentNodeId = C 则 C 不创建 JobVertex
*/
StreamConfig config = currentNodeId.equals(startNodeId) ?
createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration());
// TODO 注释: chain 在一起的多条边 connect 在一起
for(StreamEdge edge : transitiveOutEdges) {
/**
* 重点 3 根据 StreamNode和 StreamEdge 生成 JobEge 和 IntermediateDataSet 用来将JobVertex和JobEdge相连
* TODO ->
*/
connect(startNodeId, edge);
}
}
重点 1 isChainable
-> isChainable(){
// TODO 条件1. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入) A -> B B A 一一对应 如果shuffle类,那么B的入度就 >= 2
return downStreamVertex.getInEdges().size() == 1
// TODO 注释: 条件2. 上下游算子实例处于同一个SlotSharingGroup中
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
// TODO -> 注释: 这里面有 3 个条件 条件345
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
// TODO 注释:条件6 两个算子间的物理分区逻辑是ForwardPartitioner
// (无shuffle,当前节点的计算数据,只会发给自己 one to one 如上游50个task 计算完直接发送给下游50个task)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
// TODO 注释:条件7 两个算子间的shuffle方式不等于批处理模式
&& edge.getShuffleMode() != ShuffleMode.BATCH
// TODO 注释:条件8 上下游算子实例的并行度相同
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
// TODO 注释:条件9 启动了 chain
&& streamGraph.isChainingEnabled();
}->areOperatorsChainable()
->static boolean areOperatorsChainable(StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) {
// TODO 注释: 获取 上游 Operator
StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
// TODO 注释: 获取 下游 Operator
StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
// TODO 注释: 如果上下游有一个为空,则不能进行 chain 条件3、前后算子不为空
if(downStreamOperator == null || upStreamOperator == null) {
return false;
}
/*************************************************
* TODO
* 注释:
* 条件4、上游算子的链接策略是 always 或者 head
* 条件5、下游算子的链接策略必须是 always
*/
if(upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
return false;
}
重点 2 createJobVertex
-> createJobVertex(){
byte[] hash = chainInfo.getHash(streamNodeId);
JobVertexID jobVertexId = new JobVertexID(hash);
jobVertex = new JobVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
}
由此可见JobVertex就是StreamNode对应的streamNodeId 关联
重点 3 根据 StreamNode和 StreamEdge 生成 JobEge 和 IntermediateDataSet 用来将JobVertex和JobEdge相连
connect(startNodeId, edge)-> jobEdge = downStreamVertex.connectNewDataSetAsInput
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
// TODO -> input是JobVertex 即 JobVertex 创建 IntermediateDataSet
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
// TODO 创建 JobEdge
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
// TODO IntermediateDataSet -> JobEdge 即 IntermediateDataSet 的消费者是 JobEdge
dataSet.addConsumer(edge);
return edge;
}
--> JobVertex.createAndAddResultDataSet
-> IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this); 这个this 就是 JobVertex
public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {
this.id = checkNotNull(id);
// producer 是 JobVertex 即IntermediateDataSet 的生产者是JobVertex JobVertex -> IntermediateDataSet
this.producer = checkNotNull(producer);
this.resultType = checkNotNull(resultType);
}
/**
* 至此形成流图 JobVertex -> IntermediateDataSet -> JobEdge
*/
2.3 ExecutionGraph
ExecutionGraph executioinGraph = SchedulerBase.createAndRestoreExecutionGraph()
-> buildGraph -> executionGraph.attachJobGraph(
ExecutionGraph 事实上只是改动了 JobGraph 的每个节点,而没有对整个拓扑结构进行变动,所以代码里只是挨个遍历 jobVertex 并进行处理)
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
// 遍历 JobVertex 执行并行化生成 ExecutioinVertex
for(JobVertex jobVertex : topologiallySorted) {
// 一个 JobVertex 对应的创建一个 ExecutionJobVertex
ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, maxPriorAttemptsHistoryLength, rpcTimeout, globalModVersion, createTimestamp);
/*************************************************
* TODO
* 注释: 处理 JobEdge 和 IntermediateResult 和 ExecutionJobVertex中的 ExecutionVertex
* 对每个 JobEdge,获取对应的 IntermediateResult,并记录到本节点的输入上
* 最后,把每个 ExecutorVertex 和对应的 IntermediateResult 关联起来
*/
ejv.connectToPredecessors(this.intermediateResults);
}
-> public ExecutionJobVertex(){
// TODO 注释: 初始化 producedDataSets 数组中的每个 IntermediateResult
// JobGraph里的IntermediateDataSet 对应 ExecutionGraph里的IntermediateResult
for(int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
this.producedDataSets[i] = new IntermediateResult(result.getId(), this, numTaskVertices, result.getResultType());
}
// TODO 注释: 创建 ExecutionVertex 对象
for(int i = 0; i < numTaskVertices; i++) {
ExecutionVertex vertex = new ExecutionVertex(this, i, producedDataSets, timeout, initialGlobalModVersion, createTimestamp,maxPriorAttemptsHistoryLength);
this.taskVertices[i] = vertex;
}
----------------------------------------------------
}
/*************************************************
* TODO 核心
* 注释: 处理 JobEdge 和 IntermediateResult 和 ExecutionJobVertex中的 ExecutionVertex
* 对每个 JobEdge,获取对应的 IntermediateResult,并记录到本节点的输入上
* 最后,把每个 ExecutorVertex 和对应的 IntermediateResult 关联起来
*/
ejv.connectToPredecessors(this.intermediateResults){
/*************************************************
* TODO
* 注释: 根据并行度来设置 ExecutionVertex
*/
for(int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
ev.connectSource(num, ires, edge, consumerIndex);
} ->
private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
for(int i = 0; i < sourcePartitions.length; i++) {
IntermediateResultPartition irp = sourcePartitions[i];
// this 这里就是ExecutionVertex
//得到流图 IntermediateResultPartition -> ExecutionEdge -> ExecutionVertex
edges[i] = new ExecutionEdge(irp, this, inputNumber);
}
/**
* 并行化后 看图
* IntermediateResult(2个partition) ExecutionEdge ExecutionJobVertex
* ExecutionEdge -------- ExecutionVertex
* IntermediateResultPartition
* ExecutionEdge -------- ExecutionVertex
* ExecutionVertex[] taskVertices;
表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition
IntermediateResult[] producedDataSets;
和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个 IntermediateResultPartition,其个数等于该operator的并发度
*/
return edges;
}
}
}