上一节已经说过了,PackagedProgram最后会调用callMainMethod以反射方式调用提交的flink程序中的main方法,而main的第一行语句基本上就是ExecutionEnvironment的初始化.下面我们来了解ExuctionEnvironment的初始化
1. StreamExecutionEnvironment初始化
一般來説有ExecutionEnvinronment和streamExecutionEnvinronment,本文主要指StreamExecutionEnvironment
1. StreamExecutionEnvironment简单介绍
StreamExecutionEnvironment主要是一个流式任务运行的上下文环境。StreamExecutionEnvironment主要提供了控制任务执行,例如设置并行度,同时与外界进行交互,例如获取数据源。
2. StreamExectuionEnvironment继承关系
如图一所示,为StreamExecutionEnvironment的关系类图
- RemoteStreamExecution:
通过远程参数将流式作业提交到集群运行,且创建过程中需要以应用程序的JAR包的作为参数
- LocalStreamExecution: 作为流式作业的本地执行环境,和LocalEnvironment一样在本地创建JVM中创建MiniCluster,并将流式作业运行在MiniCluster中
- StreamContextEnvironment:
用在Cli-Client客户端方式提交流式作业,且基于ContextEnvironment构建。 ContextEnvironment是ExecutionEnvironment的子类,ExecutionEnvironment是基于离线批计算的ExecutionEnvironment。用于以客户端命令行的方式提交Flink作业的场景
- StreamPlanEnvironment:
和OptimizerPlanEnvironment一样,用与获取Streamgraph结构,本身不参与作业的执行
3. StreamExecutionEnvironment组成结构
StreamExecutionEnvironment有以下部分
- contextEnvironmentFactory:在命令行模式提交作业时,当用户使用客户端提交作业时,就会通过contextEnvironmentFactory创建StreamExecutionEnvironment。类型是StreamExecutionEnvironmentFactory
- transformations: 一个tranformation的arraylist,用于构建StreamGraph对象
- defaultbackend:状态存储后端,默认实现为MemoryStateBackend
- executorServiceLoader:通过Java SPI加载PipelineExecutorFactory的实现类。然后生成PipelineExecutor
- UserClassLoader: 为用户编写代码提供的类加载器,当用户提交Flink应用程序的时候,userClassloader会加载jar文件中的所有类,并伴随应用程序的产生,构建,提交,运行全过程
- jobListener:在应用程序通过客户端提交到集群后,向客户端通过应用程序的执行结果。 例如作业执行状态等
4. flink任务提交与执行-Excution创建阶段
1.StreamExecutionEnvironment创建
- 基于上文可知,
在客户端提交的时候
,有PackagedProgram.callMainTheod方法最后会执行用户提交的flink的程序的main方法。同时,在创建和运行PackageProgram的时候,会同时创建ContextEnvironment - ContextEnvironment会使用ContextEnvironment.setAsContext(factory)方法将ContextEnvironmentFactory设定到ExecutionEnvironment的contextEnvironmentFactory和threadLocalContextEnvironmentFactory变量中,因此通过Packaged创建的contextEnvironmentFactory对StreamEnvironment进行初始化
- 由上可知,我们已经获取了contextEnvironmentFactory,具体的调用方法为StreamExecutionEnvironment.getExecutionEnvironment中创建ExecutionEnvironment,
- 在StreamExecutionEnvironment.getExecutionEnvironment中,主要是以下逻辑,先从threadLocalContextEnvironmentFactory, contextEnvironmentFactory中获取工厂类
- 然后利用工厂类获取StreamExecutionEnvironment,如果工厂类都为空,无法创建实例,则调用StreamExecutionEnvironment::createStreamExecutionEnvironment
- StreamExecutionEnvironment::createStreamExecutionEnvironment方法根据不同的ExecutionEnvironment env来创建不同的StreamExecutionEnvironment
2.生成StreamGraph
如图三所示为StreamGraph创建的流程图,以下为具体步骤:
- 由上文可知,PackagedProgram利用反射调用main方法,在main方法中执行StreamExecutionEnvironment.execute("jobname").
- 在StreamExecutionEnvironment.execute方法中,会进一步执行StreamExecutionEnvironment.getStreamGraph方法,获取StreamGraph
- 在StreamExecutionEnvironment.getStreamGraph方法中,调用StreamExecutionEnvironment.getStreamGraphGenerator方法获取StreamGraphGenerator
- StreamGraphGenerator的generate()方法生成StreamGraph
- 将StreamGraph返回StreamExecutionEnvironment
3.执行executeAsync方法将StreamGraph并生成jobGraph
- 在StreamExecutionEnvironment.execute方法中,收到getStreamGraph(jobname)返回来的SteamGraph,进一步执行该方法中的getStreamGraphGenerator.executeAsync方法,异步的触发整个job的执行
- 在getStreamGraphGenerator.executeAsync方法中会利用上面提及的executorServiceLoader来加载PipelineExecutorFactory的实现类,并且生成PipelineExecutorFactory
- PipelineExecutorFactory加载完成,调用PipelineExecutorFactory.getExecutor方法创建PipeLineExecutor.
- 接着调用PipeLineExecutor.execute()方法执行创建好的StreamGraph方法,然后会向StreamExecutionEnvironment异步返回JobClientFuture
- StreamExecutionEnvironment调用JobClientFuture.get()方法得到同步的JobClient对象
- jobClient将执行Job的结果返回给StreamExecutionEnvironment
- 最后使用上面提及的JobListeners来遍历JobListener获取对应的结果,返回给客户端
5. 一些比较重要的方法
1.StreamGrapthgenerator.tranform()
该方法主要是将tranform节点根据不同的Tranformation类型,选择不同的解析逻辑,然后将所有的Tranformation转关成StreamGraph中对应的节点,完成整个StreamGraph对象的构建
2.FlinkPipelineTranslationUtil.getJobGraph
该方法是在PipelineExecutor.excute中调用,主要是将Pipeline转换为JobGraph,由于StreamGraph是Pipeline的子类,因此本质就是streamGraph的转换.接着会调用到StreamGraph.getJobGraph,最后在StreamingJobGraphGenerator 对象中将StreamGraph对象转换为JobGraph数据结构,相对于StreamGraph,JobGraph增加了系统执行参数及依赖等信息,如作业依赖的JAR包
6. 参考文献
flink设计与实现:核心原理与源码解析