RestClusterClient提交任务源码分析
RestClusterClient和MiniClusterClient
MiniClusterClient负责本地提交flink任务
RestClusterClient通过 HTTP REST 请求进行提交任务
submitJob提交任务
submitJob分为几个主要的步骤:
- 异步创建临时文件,用来存储jobgraph
- 将jobgraph依赖的一些文件信息(文件名,文件路径)缓存到集合中,然后封装成JobSubmitRequestBody
- JobSubmitRequestBody异步调用 sendRetriableRequest()方法,返回一个JobSubmitResponseBody对象,JobSubmitResponseBody中包含可以从中检索作业状态的 URL
@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
CompletableFuture<java.nio.file.Path> jobGraphFileFuture =
CompletableFuture.supplyAsync(
() -> {
try {
final java.nio.file.Path jobGraphFile =
Files.createTempFile("flink-jobgraph", ".bin");
try (ObjectOutputStream objectOut =
new ObjectOutputStream(
Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
}
return jobGraphFile;
} catch (IOException e) {
throw new CompletionException(
new FlinkException("Failed to serialize JobGraph.", e));
}
},
executorService);
CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture =
jobGraphFileFuture.thenApply(
jobGraphFile -> {
List<String> jarFileNames = new ArrayList<>(8);
List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames =
new ArrayList<>(8);
Collection<FileUpload> filesToUpload = new ArrayList<>(8);
filesToUpload.add(
new FileUpload(
jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
for (Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(
new FileUpload(
Paths.get(jar.toUri()),
RestConstants.CONTENT_TYPE_JAR));
}
// DistributedCache 提供静态方法将注册的缓存文件写入作业配置或从作业配置中解码它们。
for (Map.Entry<String, DistributedCache.DistributedCacheEntry>
//遍历jobgraph中用户定义的jar文件
artifacts : jobGraph.getUserArtifacts().entrySet()) {
final Path artifactFilePath =
new Path(artifacts.getValue().filePath);
try {
// Only local artifacts need to be uploaded.
if (!artifactFilePath.getFileSystem().isDistributedFS()) {
artifactFileNames.add(
new JobSubmitRequestBody.DistributedCacheFile(
artifacts.getKey(),
artifactFilePath.getName()));
filesToUpload.add(
new FileUpload(
Paths.get(artifacts.getValue().filePath),
RestConstants.CONTENT_TYPE_BINARY));
}
} catch (IOException e) {
throw new CompletionException(
new FlinkException(
"Failed to get the FileSystem of artifact "
+ artifactFilePath
+ ".",
e));
}
}
final JobSubmitRequestBody requestBody =
new JobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);
return Tuple2.of(
requestBody, Collections.unmodifiableCollection(filesToUpload));
});
//对提交作业的响应,包含可以从中检索作业状态的 URL。
final CompletableFuture<JobSubmitResponseBody> submissionFuture =
requestFuture.thenCompose(
requestAndFileUploads ->
sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable()));
submissionFuture
.thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
.thenAccept(
jobGraphFile -> {
try {
Files.delete(jobGraphFile);
} catch (IOException e) {
LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);
}
});
return submissionFuture
.thenApply(ignore -> jobGraph.getJobID())
.exceptionally(
(Throwable throwable) -> {
throw new CompletionException(
new JobSubmissionException(
jobGraph.getJobID(),
"Failed to submit JobGraph.",
ExceptionUtils.stripCompletionException(throwable)));
});
}
sendRetriableRequest方法
sendRetriableRequest方法是RestClusterClient的一个私有方法,支持失败重试。此方法重点做了以下事情。
- getWebMonitorBaseUrl()获取jobmanager的通信地址
- 通过sendRequest方法发送请求给jobmanager
private <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRetriableRequest(
M messageHeaders,
U messageParameters,
R request,
Collection<FileUpload> filesToUpload,
Predicate<Throwable> retryPredicate) {
return retry(
() ->
getWebMonitorBaseUrl()
.thenCompose(
webMonitorBaseUrl -> {
try {
return restClient.sendRequest(
webMonitorBaseUrl.getHost(),
webMonitorBaseUrl.getPort(),
messageHeaders,
messageParameters,
request,
filesToUpload);
} catch (IOException e) {
throw new CompletionException(e);
}
}),
retryPredicate);
}
sendRequest方法
sendRequest方法的作用:
- 对jobmanager ip地址,端口号,请求参数,jobgraph文件进行校验。
- 将ip地址,端口号,请求参数,jobgraph文件信息封装到可以与Netty通信的Request对象中。
- 通过submitRequest方法继续提交请求
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(
String targetAddress,
int targetPort,
M messageHeaders,
U messageParameters,
R request,
Collection<FileUpload> fileUploads,
RestAPIVersion apiVersion)
throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(
NetUtils.isValidHostPort(targetPort),
"The target port " + targetPort + " is not in the range [0, 65535].");
Preconditions.checkNotNull(messageHeaders);
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(messageParameters);
Preconditions.checkNotNull(fileUploads);
Preconditions.checkState(
messageParameters.isResolved(), "Message parameters were not resolved.");
if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
throw new IllegalArgumentException(
String.format(
"The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.",
apiVersion,
messageHeaders.getHttpMethod(),
messageHeaders.getTargetRestEndpointURL(),
messageHeaders.getSupportedAPIVersions().stream()
.map(RestAPIVersion::getURLVersionPrefix)
.collect(Collectors.joining(","))));
}
String versionedHandlerURL =
"/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
LOG.debug(
"Sending request of class {} to {}:{}{}",
request.getClass(),
targetAddress,
targetPort,
targetUrl);
// serialize payload
StringWriter sw = new StringWriter();
objectMapper.writeValue(sw, request);
ByteBuf payload =
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
Request httpRequest =
createRequest(
targetAddress + ':' + targetPort,
targetUrl,
messageHeaders.getHttpMethod().getNettyHttpMethod(),
payload,
fileUploads);
final JavaType responseType;
final Collection<Class<?>> typeParameters = messageHeaders.getResponseTypeParameters();
if (typeParameters.isEmpty()) {
responseType = objectMapper.constructType(messageHeaders.getResponseClass());
} else {
responseType =
objectMapper
.getTypeFactory()
.constructParametricType(
messageHeaders.getResponseClass(),
typeParameters.toArray(new Class<?>[typeParameters.size()]));
}
return submitRequest(targetAddress, targetPort, httpRequest, responseType);
}
submitRequest方法
submitRequest方法通过Netty的channel与jobmanager进行远程通信,主要就是将包装好的httpRequest请求写入Netty channel中完成远程异步请求。
private <P extends ResponseBody> CompletableFuture<P> submitRequest(
String targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
connectFuture.addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
channelFuture.complete(future.channel());
} else {
channelFuture.completeExceptionally(future.cause());
}
});
return channelFuture
.thenComposeAsync(
channel -> {
ClientHandler handler = channel.pipeline().get(ClientHandler.class);
CompletableFuture<JsonResponse> future;
boolean success = false;
try {
if (handler == null) {
throw new IOException(
"Netty pipeline was not properly initialized.");
} else {
httpRequest.writeTo(channel);
future = handler.getJsonFuture();
success = true;
}
} catch (IOException e) {
future =
FutureUtils.completedExceptionally(
new ConnectionException(
"Could not write request.", e));
} finally {
if (!success) {
channel.close();
}
}
return future;
},
executor)
.thenComposeAsync(
(JsonResponse rawResponse) -> parseResponse(rawResponse, responseType),
executor);
}