1. 前言
Flink的源码体系比较庞大,一头扎进去,很容易一头雾水,不知道从哪部分代码看起。但是如果结合我们的业务开发,有针对性地去跟进源码去发现问题,理解源码里的执行细节,效果会更好。
笔者在近期的Flink开发过程中,因为产品的原因,只允许部署Flink standalone模式,出于性能考虑,很有必要对其性能做下测试。
Flink的standalone模式的部署方式很简单。只需要设定下基本的全局配置参数就行。比如jobmanager.heap.size, taskmanager.heap.size, parallelism.default, taskmanager.numberOfTaskSlots等这些常用参数,就可以执行./bin/start-cluster.sh来启动Flink的standalone模式。
但是当我执行:
./bin/flink run -c chx.demo.FirstDemo /demo/chx.jar
来提交我的任务时,发现问题了。当批处理的数据量达2000W时,一切还挺正常,但是当批处理的数据量达3800W时,报出了异常:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka://flink/user/$a#183984057]] after [10000ms]
碰到这种报错,首先Akka的机制我们是有必要熟悉下的,但是本文不重点讲解Akka的原理和用法,不过我后续文章想对akka做具体的分析和总结。
本文重点讲述我们通过./bin/flink run提交任务时,程序到底做了什么事情。对背后代码的执行逻辑做一番分析。
2. 源码分析
2.1. 执行入口main函数分析
提交任务时执行的入口是org.apache.flink.client.cli包里CliFrontend的main函数。main函数的代码,我这里先贴出来:
public static void main(final String[] args) {
// 获取系统的一些参数,并且日志打印出来
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. 找到配置路径【FLINK_CONF_DIR】
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. 加载全局配置
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3.加载自定义命令行
final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
// 4. 初始化命令行客户端对象
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
// 5. 安全工具的安装
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 6.执行主体逻辑
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
main入口函数的执行逻辑主体是在第6步(看代码注释)。然后我们继续看其调用的parseParameters函数方法。
2.2. 参数解析逻辑分析
/**
* 解析命令栏参数并且启动请求响应
*
* @param args 客户端命令行参数.
* @return The return code of the program
*/
public int parseParameters(String[] args) {
// 1. 检查参数,如果没有参数,那么就打印help提示操作
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// 2. 执行动作,是第一个参数,比如run,cancel等等
String action = args[0];
// 3. 将执行动作从参数列表里移除
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// 执行动作
switch (action) {
// 执行动作,执行任务
case ACTION_RUN:
run(params);
return 0;
// 列出所有的任务
case ACTION_LIST:
list(params);
return 0;
// 得到某任务的信息
case ACTION_INFO:
info(params);
return 0;
// 取消任务
case ACTION_CANCEL:
cancel(params);
return 0;
// 停止任务
case ACTION_STOP:
stop(params);
return 0;
// 保存点操作
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
// 修改操作
case ACTION_MODIFY:
modify(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println("Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println("Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
其实parseParameters函数代码逻辑也不复杂,主要是获取了客户端命令行参数,并且根据第一个参数(也就是执行任务的类型)来做出相应的逻辑处理。
2. 3. 提交任务(run)逻辑分析
因为我们主要是提交任务,也即执行的./bin/flink run操作。所以我们接着重点分析run(params)函数:
/**
* 执行任务操作
*
* @param args run操作的命令行参数.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
// 1. 命令行参数解析
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
final RunOptions runOptions = new RunOptions(commandLine);
// 2. 判断是否是help操作
if (runOptions.isPrintHelp()) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 3. 一定要指定run的jar包目录,也就是我们任务的逻辑代码jar包
if (runOptions.getJarFilePath() == null) {
throw new CliArgsException("The program JAR file was not specified.");
}
// 4. 初始化一个打包程序实例
final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
program = buildProgram(runOptions);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
}
final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
try {
// 5. 运行提交的任务的核心逻辑
runProgram(customCommandLine, commandLine, runOptions, program);
} finally {
program.deleteExtractedLibraries();
}
}
这里我们先重点看下第4步的逻辑,即初始化一个打包程序实例:
program = buildProgram(runOptions);
2.4. 任务执行程序初始化
分析下buildProgram函数逻辑:
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
// 1. 执行参数:执行参数,任务逻辑jar包路径,执行classPath路径
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
// 2. 判断执行逻辑的jar包路径不能为空
if (jarFilePath == null) {
throw new IllegalArgumentException("The program JAR file was not specified.");
}
File jarFile = new File(jarFilePath);
// 3.检查jar包文件是否存在以及文件类型是否正确。
if (!jarFile.exists()) {
throw new FileNotFoundException("JAR file does not exist: " + jarFile);
}
else if (!jarFile.isFile()) {
throw new FileNotFoundException("JAR file is not a file: " + jarFile);
}
// 4. 任务执行逻辑jar包的入口函数
String entryPointClass = options.getEntryPointClassName();
// 5. 初始化PackagedProgram对象。我们重点分析指定了入口函数的情况。
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
// 设置保存点
program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
return program;
}
/**
* 创建使用给定参数包装jar文件中定义的计划的实例。为了生成计划,使用className参数中定义的类。
* @param jarFile
* 包含任务计划的jar包文件.
* @param classpaths
* 程序执行需要的其他classpath URLs.
* @param entryPointClassName
* 任务逻辑jar包的执行入口函数
* @param args
* 可选参数
* @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
// 1. 任务执行jar包的一些检查。比如路径是否存在,jar格式对不对等。
if (jarFile == null) {
throw new IllegalArgumentException("The jar file must not be null.");
}
URL jarFileUrl;
try {
jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e1) {
throw new IllegalArgumentException("The jar file path is invalid.");
}
checkJarFile(jarFileUrl);
this.jarFile = jarFileUrl;
this.args = args == null ? new String[0] : args;
// 2.如果没有指定任务jar包的入口函数。我们将扫描,找出一个入口main函数
if (entryPointClassName == null) {
entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
}
// 3. 现在我们有了一个入口点,我们可以提取嵌套的jar文件
this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
this.classpaths = classpaths;
this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
// 4. 加载执行入口main函数
this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
// 如果入口点是Program,则实例化类并获得计划
if (Program.class.isAssignableFrom(this.mainClass)) {
Program prg = null;
try {
prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
} catch (Exception e) {
// validate that the class has a main method at least.
// the main method possibly instantiates the program properly
if (!hasMainMethod(mainClass)) {
throw new ProgramInvocationException("The given program class implements the " +
Program.class.getName() + " interface, but cannot be instantiated. " +
"It also declares no main(String[]) method as alternative entry point", e);
}
} catch (Throwable t) {
throw new ProgramInvocationException("Error while trying to instantiate program class.", t);
}
this.program = prg;
} else if (hasMainMethod(mainClass)) {
this.program = null;
} else {
throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
Program.class.getName() + " interface.");
}
}
2.5. 程序执行
任务程序初始化之后,就要真正开始执行了。回到【3. 提交任务(run)逻辑分析】代码的第5步:
runProgram(customCommandLine, commandLine, runOptions, program);
运行任务的主体逻辑如下:
/**
* 运行任务
*/
private <T> void runProgram(
CustomCommandLine<T> customCommandLine,
CommandLine commandLine,
RunOptions runOptions,
PackagedProgram program) throws ProgramInvocationException, FlinkException {
// 1. 集群环境的相关描述
final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
try {
// 2. 集群Id
final T clusterId = customCommandLine.getClusterId(commandLine);
final ClusterClient<T> client;
// 3.如果集群以作业模式启动并datached,则直接部署作业
if (clusterId == null && runOptions.getDetachedMode()) {
// 3.1 获取并行度
int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
// 3.2.创建任务图
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
// 3.3. 集群特定的参数
final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
// 3.4. 装载任务
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
runOptions.getDetachedMode());
logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
try {
// 3.5. 把客户单关闭
client.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shut down the client.", e);
}
} else {
// 4.其他情况时,应该怎么启动任务的执行呢。
final Thread shutdownHook;
if (clusterId != null) {
client = clusterDescriptor.retrieve(clusterId);
shutdownHook = null;
} else {
// 在作业模式下,我们还必须部署会话集群,因为作业可能由多个部分组成(例如,当使用.时)
final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
// there's a race-condition here if cli is killed before shutdown hook is installed
if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
} else {
shutdownHook = null;
}
}
try {
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
client.setDetached(runOptions.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
LOG.debug("{}", runOptions.getSavepointRestoreSettings());
int userParallelism = runOptions.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("
+ client.getMaxSlots() + "). "
+ "To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
userParallelism = defaultParallelism;
}
// 继续看,执行逻辑部分。
executeProgram(program, client, userParallelism);
} finally {
if (clusterId == null && !client.isDetached()) {
// terminate the cluster only if we have started it before and if it's not detached
try {
client.shutDownCluster();
} catch (final Exception e) {
LOG.info("Could not properly terminate the Flink cluster.", e);
}
if (shutdownHook != null) {
// we do not need the hook anymore as we have just tried to shutdown the cluster.
ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
}
}
try {
client.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shut down the client.", e);
}
}
}
} finally {
try {
clusterDescriptor.close();
} catch (Exception e) {
LOG.info("Could not properly close the cluster descriptor.", e);
}
}
}
继续看,重点是执任务程序的逻辑部分:
executeProgram(program, client, userParallelism);
2.6. 任务执行
executeProgram函数的代码,也就是要开始和JobManager交互了。
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program");
// 1.提交任务,通过客户端把任务提交给JobManager,病返回任务提交的结果。
final JobSubmissionResult result = client.run(program, parallelism);
if (null == result) {
throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
"ExecutionEnvironment.execute()");
}
if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
JobExecutionResult execResult = result.getJobExecutionResult();
System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
}
} else {
logAndSysout("Job has been submitted with JobID " + result.getJobID());
}
}
2.7. 任务提交的逻辑
继续看,就是client.run(program, parallelism)的逻辑。
/**
* 在阻塞或者detached状态下运用用户提交的任务jar包的通用方法
* @param prog 打包程序
* @param parallelism Flink job的执行并行度
* @return The result of the execution
* @throws ProgramMissingJobException
* @throws ProgramInvocationException
*/
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, ProgramMissingJobException {
// 1. 设置上下文类加载器
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
// 2. 如果程序指定了入口函数
if (prog.isUsingProgramEntryPoint()) {
final JobWithJars jobWithJars;
// 3. 判断客户端是否已经加载过执行任务jar所需要的所有库
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
// 3.1 如果所有依赖库已经有了
jobWithJars = prog.getPlanWithoutJars();
} else {
// 3.2 否则
jobWithJars = prog.getPlanWithJars();
}
// 4. 执行任务
return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
// 5. 程序在detached方式下提交
else if (prog.isUsingInteractiveMode()) {
log.info("Starting program in interactive mode (detached: {})", isDetached());
final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
libraries = Collections.emptyList();
} else {
libraries = prog.getAllLibraries();
}
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);
try {
// 触发main方法
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
throw new ProgramMissingJobException("The program didn't contain a Flink job.");
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
}
else {
// in blocking mode, we execute all Flink jobs contained in the user code and then return here
return this.lastJobExecutionResult;
}
}
finally {
ContextEnvironment.unsetContext();
}
}
else {
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
}
继续看下面,这里等待JobSubmissionResult的结果过程是阻塞的,这个有可能是我们程序超时报错的原因。
/**
* 在Flink集群上运行程序,该客户端连接到该集群。调用阻塞直到执行完成,然后返回。
*
* @param jobWithJars 待执行的程序包
* @param parallelism 执行并行度
*
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
// 1. 任务程序的类加载器
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
// 2. 优化计划
OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
// 3. 执行
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}
这里主要是生成了优化的执行计划,然后执行:
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
// 1. 得到任务的执行图
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
// 2.提交任务
return submitJob(job, classLoader);
}
上面程序逻辑是生成任务执行图,然后开始提交任务。
2.8. 提交任务
提交任务的模式有多种:FakeCluster模式,MiniCluster模式,RestCluster模式,Standalone模式和yarn模式。不同模式的提交任务逻辑是不同的。我们部署的方式是Standalone模式,所以就只分析该模式提交任务的逻辑吧。
@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
throws ProgramInvocationException {
// 1. detached方式
if (isDetached()) {
return super.runDetached(jobGraph, classLoader);
} else {
// 不是detached方式。
return super.run(jobGraph, classLoader);
}
}
因为我们的测试任务是非detached模式,所以继续往下看super.run(jobGraph, classLoader)的逻辑:
/**
* 提交一个任务图阻塞
* @param jobGraph The JobGraph
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
* @return JobExecutionResult
* @throws ProgramInvocationException
*/
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
// 1. 等待集群预备好环境
waitForClusterToBeReady();
// 2. 初始化Actor系统
final ActorSystem actorSystem;
// 3. 获取Actor系统。
try {
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
"JobManager.", jobGraph.getJobID(), fe);
}
try {
// 4. 提交任务并且阻塞等待结果
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
this.lastJobExecutionResult = JobClient.submitJobAndWait(
actorSystem,
flinkConfig,
highAvailabilityServices,
jobGraph,
timeout,
printStatusDuringExecution,
classLoader);
return lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e);
}
}
看代码注释的第四步,这是主体逻辑。这里执行JobClient.submitJobAndWait有个参数timeout,该参数是akka.client.timeout,这个值默认是60s,在flink配置文件里可以配置。这个值的大小跟我们程序的超时报错应用有关系,相当于是客户端等待结果返回的等待超时时间。
2.9. 提交任务并等待
/**
* 向jobClient指定的JobClient参与者发送[[JobGraph],JobClient actor随后将提交提交给JobManager。
* 该方法阻塞,直到作业完成或JobManager不再存在为止。在前一种情况下,返回[[SerializedJobExecutionResult],在后一种情况下,抛出[[JobExecutionException]]。
*
* @param actorSystem 执行通信的actor system
* @param config 集群的环境配置
* @param highAvailabilityServices 高可用性服务的服务工厂
* @param jobGraph Flink任务的执行图
* @param timeout 等待futures结果的超时时间
* @param sysoutLogUpdates 如果为真,则将日志更新打印到系统输出
* @param classLoader 用于反序列化结果的类加载器
* @return The job execution result
* @throws JobExecutionException Thrown if the job
* execution fails.
*/
public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) throws JobExecutionException {
// 1. 监听任务执行的上下文
JobListeningContext jobListeningContext = submitJob(
actorSystem,
config,
highAvailabilityServices,
jobGraph,
timeout,
sysoutLogUpdates,
classLoader);
// 2. 阻塞等待任务的执行结果
return awaitJobResult(jobListeningContext);
}
继续看代码注释的第1步submitJob逻辑:
/**
* 将作业提交到Flink集群(非阻塞)并返回JobListeningContext
* 该JobListeningContext可以传递到{@code awaitJobResult}以获得提交的结果。
*
* @return JobListeningContext which may be used to retrieve the JobExecutionResult via
* {@code awaitJobResult(JobListeningContext context)}.
*/
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) {
// 1. 校验结果的非空情况。
checkNotNull(actorSystem, "The actorSystem must not be null.");
checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
// 2.对于这个作业,我们创建一个代理JobClientActor,它处理与JobManager的所有通信。
// 它转发作业提交,检查成功/失败响应,记录更新消息,监视客户端和JobManager之间的断开等
Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates,
config);
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
// 3. 提交任务的Future等待
Future<Object> submissionFuture = Patterns.ask(
jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
// 4. 返回任务执行监听上下文
return new JobListeningContext(
jobGraph.getJobID(),
submissionFuture,
jobClientActor,
timeout,
classLoader,
highAvailabilityServices);
}
我们先看下上面代码注释逻辑的第3步,Pattern.ask的逻辑,这里开始引用akka的逻辑了,逻辑代码是用scala写的:
/**
* 异步发送消息并返回scala.concurrent.Future,其中保存最终回复消息;这意味着目标actor需要将结果发送到提供的“sender”引用.
* Future将以[[akka.pattern.AskTimeoutException]]结束;这与等待该未来的结果时应用的任何超时无关(比如,`Await.result(...,timeout)')。
*/
def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]]
2.10. 等待异步结果
回到2.9的阻塞等待执行结果:return awaitJobResult(jobListeningContext)。
/**
* 给定一个joblisteningcontext,等待结果
* @param listeningContext 任务执行的监听上下文
* @return 执行结果
* @throws JobExecutionException if anything goes wrong while monitoring the job
*/
public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
// JobID
final JobID jobID = listeningContext.getJobID();
// Actor代理
final ActorRef jobClientActor = listeningContext.getJobClientActor();
// 执行结果Future
final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
// 监听上下文的超时时间
final FiniteDuration askTimeout = listeningContext.getTimeout();
// 如果需要,检索类加载器
final ClassLoader classLoader = listeningContext.getClassLoader();
//等待拥有结果的未来,不时ping下JobClientActor以检查它是否仍在运行
while (!jobSubmissionFuture.isCompleted()) {
try {
Await.ready(jobSubmissionFuture, askTimeout);
} catch (InterruptedException e) {
throw new JobExecutionException(
jobID,
"Interrupted while waiting for job completion.");
} catch (TimeoutException e) {
try {
Await.result(
Patterns.ask(
jobClientActor,
// Ping the Actor to see if it is alive
new Identify(true),
Timeout.durationToTimeout(askTimeout)),
askTimeout);
// we got a reply, continue waiting for the job result
} catch (Exception eInner) {
// we could have a result but the JobClientActor might have been killed and
// thus the health check failed
if (!jobSubmissionFuture.isCompleted()) {
throw new JobExecutionException(
jobID,
"JobClientActor seems to have died before the JobExecutionResult could be retrieved.",
eInner);
}
}
}
}
final Object answer;
try {
// we have already awaited the result, zero time to wait here
answer = Await.result(jobSubmissionFuture, Duration.Zero());
}
catch (Throwable throwable) {
throw new JobExecutionException(jobID,
"Couldn't retrieve the JobExecutionResult from the JobManager.", throwable);
}
finally {
// failsafe shutdown of the client actor
jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
// second block handles the actual response
if (answer instanceof JobManagerMessages.JobResultSuccess) {
LOG.info("Job execution complete");
SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
if (result != null) {
try {
return result.toJobExecutionResult(classLoader);
} catch (Throwable t) {
throw new JobExecutionException(jobID,
"Job was successfully executed but JobExecutionResult could not be deserialized.");
}
} else {
throw new JobExecutionException(jobID,
"Job was successfully executed but result contained a null JobExecutionResult.");
}
}
else if (answer instanceof JobManagerMessages.JobResultFailure) {
LOG.info("Job execution failed");
SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
if (serThrowable != null) {
Throwable cause = serThrowable.deserializeError(classLoader);
if (cause instanceof JobExecutionException) {
throw (JobExecutionException) cause;
} else {
throw new JobExecutionException(jobID, "Job execution failed", cause);
}
} else {
throw new JobExecutionException(jobID,
"Job execution failed with null as failure cause.");
}
}
else if (answer instanceof JobManagerMessages.JobNotFound) {
throw new JobRetrievalException(
((JobManagerMessages.JobNotFound) answer).jobID(),
"Couldn't retrieve Job " + jobID + " because it was not running.");
}
else {
throw new JobExecutionException(jobID,
"Unknown answer from JobManager after submitting the job: " + answer);
}
}
这里面的具体逻辑后面再具体解析。