flink任务提交与执行2-ExecutionEnvironment初始化

上一节已经说过了,PackagedProgram最后会调用callMainMethod以反射方式调用提交的flink程序中的main方法,而main的第一行语句基本上就是ExecutionEnvironment的初始化.下面我们来了解ExuctionEnvironment的初始化

1. StreamExecutionEnvironment初始化

一般來説有ExecutionEnvinronment和streamExecutionEnvinronment,本文主要指StreamExecutionEnvironment

1. StreamExecutionEnvironment简单介绍

StreamExecutionEnvironment主要是一个流式任务运行的上下文环境。StreamExecutionEnvironment主要提供了控制任务执行,例如设置并行度,同时与外界进行交互,例如获取数据源。

1639200229(1).png

图一

2. StreamExectuionEnvironment继承关系

如图一所示,为StreamExecutionEnvironment的关系类图

  • RemoteStreamExecution:

通过远程参数将流式作业提交到集群运行,且创建过程中需要以应用程序的JAR包的作为参数

  • LocalStreamExecution: 作为流式作业的本地执行环境,和LocalEnvironment一样在本地创建JVM中创建MiniCluster,并将流式作业运行在MiniCluster中
  • StreamContextEnvironment:

用在Cli-Client客户端方式提交流式作业,且基于ContextEnvironment构建。 ContextEnvironment是ExecutionEnvironment的子类,ExecutionEnvironment是基于离线批计算的ExecutionEnvironment。用于以客户端命令行的方式提交Flink作业的场景

  • StreamPlanEnvironment:

和OptimizerPlanEnvironment一样,用与获取Streamgraph结构,本身不参与作业的执行

3. StreamExecutionEnvironment组成结构

StreamExecutionEnvironment.png

图二

StreamExecutionEnvironment有以下部分

  • contextEnvironmentFactory:在命令行模式提交作业时,当用户使用客户端提交作业时,就会通过contextEnvironmentFactory创建StreamExecutionEnvironment。类型是StreamExecutionEnvironmentFactory
  • transformations: 一个tranformation的arraylist,用于构建StreamGraph对象
  • defaultbackend:状态存储后端,默认实现为MemoryStateBackend
  • executorServiceLoader:通过Java SPI加载PipelineExecutorFactory的实现类。然后生成PipelineExecutor
  • UserClassLoader: 为用户编写代码提供的类加载器,当用户提交Flink应用程序的时候,userClassloader会加载jar文件中的所有类,并伴随应用程序的产生,构建,提交,运行全过程
  • jobListener:在应用程序通过客户端提交到集群后,向客户端通过应用程序的执行结果。 例如作业执行状态等

4. flink任务提交与执行-Excution创建阶段

1.StreamExecutionEnvironment创建

  • 基于上文可知,在客户端提交的时候,有PackagedProgram.callMainTheod方法最后会执行用户提交的flink的程序的main方法。同时,在创建和运行PackageProgram的时候,会同时创建ContextEnvironment
  • ContextEnvironment会使用ContextEnvironment.setAsContext(factory)方法将ContextEnvironmentFactory设定到ExecutionEnvironment的contextEnvironmentFactory和threadLocalContextEnvironmentFactory变量中,因此通过Packaged创建的contextEnvironmentFactory对StreamEnvironment进行初始化
  • 由上可知,我们已经获取了contextEnvironmentFactory,具体的调用方法为StreamExecutionEnvironment.getExecutionEnvironment中创建ExecutionEnvironment,
  • 在StreamExecutionEnvironment.getExecutionEnvironment中,主要是以下逻辑,先从threadLocalContextEnvironmentFactory, contextEnvironmentFactory中获取工厂类
  • 然后利用工厂类获取StreamExecutionEnvironment,如果工厂类都为空,无法创建实例,则调用StreamExecutionEnvironment::createStreamExecutionEnvironment
  • StreamExecutionEnvironment::createStreamExecutionEnvironment方法根据不同的ExecutionEnvironment env来创建不同的StreamExecutionEnvironment

2.生成StreamGraph image.png

图三

如图三所示为StreamGraph创建的流程图,以下为具体步骤:

  1. 由上文可知,PackagedProgram利用反射调用main方法,在main方法中执行StreamExecutionEnvironment.execute("jobname").
  2. 在StreamExecutionEnvironment.execute方法中,会进一步执行StreamExecutionEnvironment.getStreamGraph方法,获取StreamGraph
  3. 在StreamExecutionEnvironment.getStreamGraph方法中,调用StreamExecutionEnvironment.getStreamGraphGenerator方法获取StreamGraphGenerator
  4. StreamGraphGenerator的generate()方法生成StreamGraph
  5. 将StreamGraph返回StreamExecutionEnvironment

3.执行executeAsync方法将StreamGraph并生成jobGraph

  1. 在StreamExecutionEnvironment.execute方法中,收到getStreamGraph(jobname)返回来的SteamGraph,进一步执行该方法中的getStreamGraphGenerator.executeAsync方法,异步的触发整个job的执行
  2. 在getStreamGraphGenerator.executeAsync方法中会利用上面提及的executorServiceLoader来加载PipelineExecutorFactory的实现类,并且生成PipelineExecutorFactory
  3. PipelineExecutorFactory加载完成,调用PipelineExecutorFactory.getExecutor方法创建PipeLineExecutor.
  4. 接着调用PipeLineExecutor.execute()方法执行创建好的StreamGraph方法,然后会向StreamExecutionEnvironment异步返回JobClientFuture
  5. StreamExecutionEnvironment调用JobClientFuture.get()方法得到同步的JobClient对象
  6. jobClient将执行Job的结果返回给StreamExecutionEnvironment
  7. 最后使用上面提及的JobListeners来遍历JobListener获取对应的结果,返回给客户端

5. 一些比较重要的方法

1.StreamGrapthgenerator.tranform()
该方法主要是将tranform节点根据不同的Tranformation类型,选择不同的解析逻辑,然后将所有的Tranformation转关成StreamGraph中对应的节点,完成整个StreamGraph对象的构建

2.FlinkPipelineTranslationUtil.getJobGraph
该方法是在PipelineExecutor.excute中调用,主要是将Pipeline转换为JobGraph,由于StreamGraph是Pipeline的子类,因此本质就是streamGraph的转换.接着会调用到StreamGraph.getJobGraph,最后在StreamingJobGraphGenerator 对象中将StreamGraph对象转换为JobGraph数据结构,相对于StreamGraph,JobGraph增加了系统执行参数及依赖等信息,如作业依赖的JAR包

6. 参考文献

flink设计与实现:核心原理与源码解析

猜你喜欢

转载自juejin.im/post/7041199990470869028