Checkpoint调用链分析
JobMaster.triggerSavepoint
JobMaster触发savepoint的时候,会启动checkpoint。schedulerNG是调度flink jobs的接口。
@Override
//方法需要两个参数:checkpoint存储路径,任务是否取消
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) {
return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}
下面我们看一下SchedulerNG的一个实现SchedulerBase
SchedulerBase.triggerSavepoint
主要流程:
- 从executionGraph获取checkpoint的协调器checkpointCoordinator
- 执行一次savepoint
- 如果之前步骤有异常,作业需要取消,则再次启动checkpointCoordinator,抛出异常
- 如果需要取消作业,之前步骤没有异常,作业取消
@Override
public CompletableFuture<String> triggerSavepoint(
final String targetDirectory, final boolean cancelJob) {
mainThreadExecutor.assertRunningInMainThread();
//获取checkpointCoordinator
final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
} else if (targetDirectory == null
&& !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
log.info(
"Trying to cancel job {} with savepoint, but no savepoint directory configured.",
jobGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory "
+ "while cancelling via -s :targetDirectory or configure a cluster-wide "
+ "default via key '"
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+ "'.");
}
log.info(
"Triggering {}savepoint for job {}.",
cancelJob ? "cancel-with-" : "",
jobGraph.getJobID());
//如果取消作业,则停止调度checkpoint
if (cancelJob) {
stopCheckpointScheduler();
}
//首先执行一次savapoint过程,其实就是一次对齐检查点的checkpoint,接下来返回保存checkpoint文件的路径
return checkpointCoordinator
.triggerSavepoint(targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync(
(path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler();
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info(
"Savepoint stored in {}. Now cancelling {}.",
path,
jobGraph.getJobID());
cancel();
}
return path;
},
mainThreadExecutor);
}
CheckpointCoordinator
CheckpointCoordinator负责协调所有算子的分布式快照和状态。它向相关的
task发送消息来触发快照动作,之后收集它们快照成功的确认消息(ack)。
CheckpointCoordinator.createActivatorDeactivator会产生一个job状态监听器,负责监听job状态的变化。
//监听作业状态变化,以开启或取消任务的checkpoint
public JobStatusListener createActivatorDeactivator() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
jobStatusListener = new CheckpointCoordinatorDeActivator(this);
}
return jobStatusListener;
}
}
JobStatusListener是一个接口,其具体实现CheckpointCoordinatorDeActivator, CheckpointCoordinatorDeActivator.jobStatusChanges方法如下:
//当作业状态为RUNNING,开启checkpoint周期性的调度
@Override
public void jobStatusChanges(
JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
接下来看一下startCheckpointScheduler:
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
Preconditions.checkState(
isPeriodicCheckpointingConfigured(),
"Can not start checkpoint scheduler, if no periodic checkpointing is configured");
// make sure all prior timers are cancelled
//先停止之前的调度器
stopCheckpointScheduler();
//创建新的调度器并延迟触发(延迟时间为checkpoint间隔最短时间到checkpoint间隔时间+1(开区间)之间的随机值)
periodicScheduling = true;
currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
}
}
scheduleTriggerWithDelay方法启动了一个定时器,定时执行的逻辑在ScheduledTrigger类中,ScheduledTrigger为CheckpointCoordinator的一个内部类。
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(), initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
triggerCheckpoint(true);
} catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
我们接着往下看triggerCheckpoint方法:
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
try {
synchronized (lock) {
preCheckGlobalState(request.isPeriodic);
}
// we will actually trigger this checkpoint!
// 真正开始触发checkpoint
Preconditions.checkState(!isTriggering);
isTriggering = true;
final long timestamp = System.currentTimeMillis();
//计算下一次触发checkpoint的计划,所谓计划就是告诉我们哪些任务需要被触发,哪些任务在等待或提交
CompletableFuture<CheckpointPlan> checkpointPlanFuture =
checkpointPlanCalculator.calculateCheckpointPlan();
final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
checkpointPlanFuture
.thenApplyAsync(
plan -> {
try {
CheckpointIdAndStorageLocation
checkpointIdAndStorageLocation =
initializeCheckpoint(
request.props,
request.externalSavepointLocation);
return new Tuple2<>(
plan, checkpointIdAndStorageLocation);
} catch (Throwable e) {
throw new CompletionException(e);
}
},
executor)
.thenApplyAsync(
(checkpointInfo) ->
//pendingCheckpoint是已经启动但尚未被所有需要确认它的任务确认的检查点。一旦所有任务都确认了它,它就变成了{@link CompletedCheckpoint}。
createPendingCheckpoint(
timestamp,
request.props,
checkpointInfo.f0,
request.isPeriodic,
checkpointInfo.f1.checkpointId,
checkpointInfo.f1.checkpointStorageLocation,
request.getOnCompletionFuture()),
timer);
final CompletableFuture<?> coordinatorCheckpointsComplete =
pendingCheckpointCompletableFuture.thenComposeAsync(
(pendingCheckpoint) ->
OperatorCoordinatorCheckpoints
//触发并确认所有CoordinatorCheckpoints
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint,
pendingCheckpoint,
timer),
timer);
//oordinator checkpoints检查点完成之后,需要调用master的钩子函数,MasterHook用于生成或回复checkpoint之前通知外部系统
// We have to take the snapshot of the master hooks after the coordinator checkpoints
// has completed.
// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
// ExternallyInducedSource is used.
final CompletableFuture<?> masterStatesComplete =
coordinatorCheckpointsComplete.thenComposeAsync(
ignored -> {
//代码执行到此,可以确保 pending checkpoint部位空
// If the code reaches here, the pending checkpoint is guaranteed to
// be not null.
//我们使用FutureUtils.getWithoutException()来让编译器乐于接受签名中的受控异常。
// We use FutureUtils.getWithoutException() to make compiler happy
// with checked
// exceptions in the signature.
PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(
pendingCheckpointCompletableFuture);
return snapshotMasterState(checkpoint);
},
timer);
FutureUtils.assertNoException(
CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
.handleAsync(
(ignored, throwable) -> {
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(
pendingCheckpointCompletableFuture);
Preconditions.checkState(
checkpoint != null || throwable != null,
"Either the pending checkpoint needs to be created or an error must have occurred.");
if (throwable != null) {
// the initialization might not be finished yet
// 初始化可能还没有完成
if (checkpoint == null) {
onTriggerFailure(request, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
} else {
//这里开始发送checkpoint触发请求
triggerCheckpointRequest(
request, timestamp, checkpoint);
}
return null;
},
timer)
.exceptionally(
error -> {
if (!isShutdown()) {
throw new CompletionException(error);
} else if (findThrowable(
error, RejectedExecutionException.class)
.isPresent()) {
LOG.debug("Execution rejected during shutdown");
} else {
LOG.warn("Error encountered during shutdown", error);
}
return null;
}));
} catch (Throwable throwable) {
onTriggerFailure(request, throwable);
}
}
startTriggeringCheckpoint方法最终会调用triggerCheckpointRequest中的triggerTasks方法。
private CompletableFuture<Void> triggerTasks(
CheckpointTriggerRequest request, long timestamp, PendingCheckpoint checkpoint) {
// no exception, no discarding, everything is OK
final long checkpointId = checkpoint.getCheckpointID();
final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
request.props.getCheckpointType(),
checkpoint.getCheckpointStorageLocation().getLocationReference(),
isExactlyOnceMode,
unalignedCheckpointsEnabled,
alignedCheckpointTimeout);
// send messages to the tasks to trigger their checkpoints
List<CompletableFuture<Acknowledge>> acks = new ArrayList<>();
for (Execution execution : checkpoint.getCheckpointPlan().getTasksToTrigger()) {
if (request.props.isSynchronous()) {
acks.add(
execution.triggerSynchronousSavepoint(
checkpointId, timestamp, checkpointOptions));
} else {
acks.add(execution.triggerCheckpoint(checkpointId, timestamp, checkpointOptions));
}
}
return FutureUtils.waitForAll(acks);
}
triggerTasks方法获取需要触发checkpoint的Tasks的Execution,Execution负责任务节点的具体执行。
下面看一下checkpoint的入口方法:
public CompletableFuture<Acknowledge> triggerCheckpoint(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
return triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
}
private CompletableFuture<Acknowledge> triggerCheckpointHelper(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
//获取slot
final LogicalSlot slot = assignedResource;
if (slot != null) {
//获取taskManagerGateway,用于和jobmanager进行通信
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
//触发triggerCheckpoint
return taskManagerGateway.triggerCheckpoint(
attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
}
LOG.debug(
"The execution has no slot assigned. This indicates that the execution is no longer running.");
return CompletableFuture.completedFuture(Acknowledge.get());
}
RpcTaskManagerGateway是TaskManagerGateway的一个实现,我们看一下RpcTaskManagerGateway.triggerCheckpoint
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions) {
return taskExecutorGateway.triggerCheckpoint(
executionAttemptID, checkpointId, timestamp, checkpointOptions);
}
TaskExecutorGateway的具体实现之一是TaskExecutor, 接着看TaskExecutor的triggerCheckpoint方法
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug(
"Trigger checkpoint {}@{} for {}.",
checkpointId,
checkpointTimestamp,
executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
final Task task = taskSlotTable.getTask(executionAttemptID);
//触发发送barrier
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager received a checkpoint request for unknown task "
+ executionAttemptID
+ '.';
log.debug(message);
return FutureUtils.completedExceptionally(
new CheckpointException(
message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
triggerCheckpointBarrier方法:
/**
* Calls the invokable to trigger a checkpoint.
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointOptions Options for performing this checkpoint.
*/
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(
checkpointID, checkpointTimestamp, System.currentTimeMillis());
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
invokable
.triggerCheckpointAsync(checkpointMetaData, checkpointOptions)
.handle(
(triggerResult, exception) -> {
if (exception != null || !triggerResult) {
declineCheckpoint(
checkpointID,
CheckpointFailureReason.TASK_FAILURE,
exception);
return false;
}
return true;
});
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting
// down, so we just ignore it.
LOG.debug(
"Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
checkpointID,
taskNameWithSubtask,
executionId);
declineCheckpoint(
checkpointID, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(
new Exception(
"Error while triggering checkpoint "
+ checkpointID
+ " for "
+ taskNameWithSubtask,
t));
} else {
LOG.debug(
"Encountered error while triggering checkpoint {} for "
+ "{} ({}) while being not in state running.",
checkpointID,
taskNameWithSubtask,
executionId,
t);
}
}
} else {
LOG.debug(
"Declining checkpoint request for non-running task {} ({}).",
taskNameWithSubtask,
executionId);
// send back a message that we did not do the checkpoint
declineCheckpoint(
checkpointID, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
}
}
这里不同的调用者会调用triggerCheckpointAsync方法,我们来看一个具体的实现,StreamTask.triggerCheckpointAsync
@Override
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
mainMailboxExecutor.execute(
() -> {
try {
boolean noUnfinishedInputGates =
Arrays.stream(getEnvironment().getAllInputGates())
.allMatch(InputGate::isFinished);
//InputGate已经接受完所有的输出
if (noUnfinishedInputGates) {
result.complete(
triggerCheckpointAsyncInMailbox(
checkpointMetaData, checkpointOptions));
} else {
result.complete(
triggerUnfinishedChannelsCheckpoint(
checkpointMetaData, checkpointOptions));
}
} catch (Exception ex) {
// Report the failure both via the Future result but also to the mailbox
result.completeExceptionally(ex);
throw ex;
}
},
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
return result;
}
private boolean triggerCheckpointAsyncInMailbox(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
throws Exception {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
latestAsyncCheckpointStartDelayNanos =
1_000_000
* Math.max(
0,
System.currentTimeMillis() - checkpointMetaData.getTimestamp());
// No alignment if we inject a checkpoint
// 创建checkpoint metrics,手机checkpoint中的参数
CheckpointMetricsBuilder checkpointMetrics =
new CheckpointMetricsBuilder()
.setAlignmentDurationNanos(0L)
.setBytesProcessedDuringAlignment(0L)
.setCheckpointStartDelayNanos(latestAsyncCheckpointStartDelayNanos);
subtaskCheckpointCoordinator.initInputsCheckpoint(
checkpointMetaData.getCheckpointId(), checkpointOptions);
//开始执行checkpoint
boolean success =
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
} catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
throw new Exception(
"Could not perform checkpoint "
+ checkpointMetaData.getCheckpointId()
+ " for operator "
+ getName()
+ '.',
e);
} else {
LOG.debug(
"Could not perform checkpoint {} for operator {} while the "
+ "invokable was not in state running.",
checkpointMetaData.getCheckpointId(),
getName(),
e);
return false;
}
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
performCheckpoint方法:
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics)
throws Exception {
LOG.debug(
"Starting checkpoint {} {} on task {}",
checkpointMetaData.getCheckpointId(),
checkpointOptions.getCheckpointType(),
getName());
if (isRunning) {
actionExecutor.runThrowing(
() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepoint(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getCheckpointType().shouldDrain());
}
if (areCheckpointsWithFinishedTasksEnabled()
&& endOfDataReceived
&& this.finalCheckpointMinId == null) {
this.finalCheckpointMinId = checkpointMetaData.getCheckpointId();
}
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
finishedOperators,
this::isRunning);
});
return true;
} else {
actionExecutor.runThrowing(
() -> {
// we cannot perform our checkpoint - let the downstream operators know that
// they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain',
// because it may not
// yet be created
final CancelCheckpointMarker message =
new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
真正执行checkpoint逻辑的是subtaskCheckpointCoordinator.checkpointState方法:
@Override
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetricsBuilder metrics,
OperatorChain<?, ?> operatorChain,
boolean isOperatorsFinished,
Supplier<Boolean> isRunning)
throws Exception {
checkNotNull(options);
checkNotNull(metrics);
//从barriers和records/watermarks/timers回调的角度来看,下面的所有步骤都是一个原子步骤。
// 我们通常会尝试尽快发出检查点屏障,以不影响下游的检查点对齐
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect
// downstream
// checkpoint alignments
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info(
"Out of order checkpoint barrier (aborted previously?): {} >= {}",
lastCheckpointId,
metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}
logCheckpointProcessingDelay(metadata);
// 记录最后一个触发的checkpointId,并在必要时中止checkpoint的同步阶段
// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint
// if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// 广播取消检查点标志,以避免下游由于检查点屏障对齐背压。
// CancelCheckpointMarker在数据流中传递,类似于CheckpointBarrier,但它表示应该取消某个检查点。该检查点的任何正在进行的校准都需要取消,并应恢复常规处理。
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to
// checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info(
"Checkpoint {} has been notified as aborted, would not trigger any checkpoint.",
metadata.getCheckpointId());
return;
}
// 如果检查点以前是未对齐的,但被迫对齐(点连接),将其恢复到这里,以便它可以跳过输出数据
// if checkpoint has been previously unaligned, but was forced to be aligned (pointwise
// connection), revert it here so that it can jump over output data
if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
options = options.withUnalignedSupported();
initInputsCheckpoint(metadata.getCheckpointId(), options);
}
// 准备checkpoint,做一些发出barrier前的逻辑。
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// 向下游算子发送checkpoint barrier
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
// 准备write in-flight buffers中的输入输出数据
// Step (3): Prepare to spill the in-flight buffers for input and output
if (options.isUnalignedCheckpoint()) {
// output data already written while broadcasting event
channelStateWriter.finishOutput(metadata.getCheckpointId());
}
// task自己执行checkpoint,异步获取状态快照,以不影响流拓扑的进展
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact
// progress of the
// streaming topology
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(
snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
finishAndReportAsync(
snapshotFutures,
metadata,
metrics,
operatorChain.isFinishedOnRestore(),
isOperatorsFinished,
isRunning);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
}
finishAndReportAsync是一个异步的方法,追进去finishAndReportAsync方法发现,其负责执行具体逻辑的是AsyncCheckpointRunnable这个线程中的run方法:
@Override
public void run() {
final long asyncStartNanos = System.nanoTime();
final long asyncStartDelayMillis = (asyncStartNanos - asyncConstructionNanos) / 1_000_000L;
LOG.debug(
"{} - started executing asynchronous part of checkpoint {}. Asynchronous start delay: {} ms",
taskName,
checkpointMetaData.getCheckpointId(),
asyncStartDelayMillis);
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
//获取快照最终的结果
SnapshotsFinalizeResult snapshotsFinalizeResult =
isFinishedOnRestore
? new SnapshotsFinalizeResult(
TaskStateSnapshot.FINISHED_ON_RESTORE,
TaskStateSnapshot.FINISHED_ON_RESTORE,
0L)
: finalizeNonFinishedSnapshots();
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncConstructionNanos) / 1_000_000L;
// 收集barriers对齐期间的输入数据和对齐的耗时
checkpointMetrics.setBytesPersistedDuringAlignment(
snapshotsFinalizeResult.bytesPersistedDuringAlignment);
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
// 将checkpoint的状态由RUNNING -> COMPLETED
if (asyncCheckpointState.compareAndSet(
AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
// 报告checkpoint状态
reportCompletedSnapshotStates(
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug(
"{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
taskName,
checkpointMetaData.getCheckpointId());
}
finishedFuture.complete(null);
} catch (Exception e) {
LOG.info(
"{} - asynchronous part of checkpoint {} could not be completed.",
taskName,
checkpointMetaData.getCheckpointId(),
e);
handleExecutionException(e);
finishedFuture.completeExceptionally(e);
} finally {
unregisterConsumer.accept(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
接下来会发送checkpoint完成的消息:
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
checkState(
hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job "
+ "manager. This indicates a problem.");
// 我们通过报告null来通知无状态任务,这样在恢复时就不会尝试将空状态分配给无状态任务。
// 这使得简单的作业修改能够只关注无状态,而不需要为它们分配uid来匹配它们的状态(总是空的)。
// we signal stateless tasks by reporting null, so that there are no attempts to assign
// empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskEnvironment
.getTaskStateManager()
.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics
.setTotalBytesPersisted(
acknowledgedTaskStateSnapshot.getStateSize())
.build(),
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug(
"{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
taskName,
checkpointMetaData.getCheckpointId(),
asyncDurationMillis);
LOG.trace(
"{} - reported the following states in snapshot for checkpoint {}: {}.",
taskName,
checkpointMetaData.getCheckpointId(),
acknowledgedTaskStateSnapshot);
}
快照完成的消息由TaskStateManager的实现类TaskStateManagerImpl.reportTaskStateSnapshots负责发送:
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
checkpointResponder.acknowledgeCheckpoint(
jobId, executionAttemptID, checkpointId, checkpointMetrics, acknowledgedState);
}
checkpointResponder.acknowledgeCheckpoint的具体实现为RpcCheckpointResponder.acknowledgeCheckpoint
@Override
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
checkpointCoordinatorGateway.acknowledgeCheckpoint(
jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
}
checkpointCoordinatorGateway.acknowledgeCheckpoint由JobMaster.acknowledgeCheckpoint具体实现
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
schedulerNG.acknowledgeCheckpoint(
jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
schedulerNG.acknowledgeCheckpoint方法有SchedulerBase.acknowledgeCheckpoint具体实现,然后再由executionGraphHandler.acknowledgeCheckpoint调用:
@Override
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
executionGraphHandler.acknowledgeCheckpoint(
jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
processCheckpointCoordinatorMessage(
"AcknowledgeCheckpoint",
coordinator ->
coordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState),
retrieveTaskManagerLocation(executionAttemptID)));
}
CheckpointCoordinator.receiveAcknowledgeMessage方法:
public boolean receiveAcknowledgeMessage(
AcknowledgeCheckpoint message, String taskManagerLocationInfo)
throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error(
"Received wrong AcknowledgeCheckpoint message for job {} from {} : {}",
job,
taskManagerLocationInfo,
message);
return false;
}
final long checkpointId = message.getCheckpointId();
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
// 获取正在进行的checkpoint操作
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDisposed()) {
switch (checkpoint.acknowledgeTask(
message.getTaskExecutionId(),
message.getSubtaskState(),
message.getCheckpointMetrics(),
getStatsCallback(checkpoint))) {
// 如果确认成功,并且接收到了所有operator快照成功的确认,调用完成此PendingCheckpoint的逻辑
case SUCCESS:
LOG.debug(
"Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug(
"Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
break;
case UNKNOWN:
LOG.warn(
"Could not acknowledge the checkpoint {} for task {} of job {} at {}, "
+ "because the task's execution attempt id was unknown. Discarding "
+ "the state handle to avoid lingering state.",
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
discardSubtaskState(
message.getJob(),
message.getTaskExecutionId(),
message.getCheckpointId(),
message.getSubtaskState());
break;
case DISCARDED:
LOG.warn(
"Could not acknowledge the checkpoint {} for task {} of job {} at {}, "
+ "because the pending checkpoint had been discarded. Discarding the "
+ "state handle tp avoid lingering state.",
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
discardSubtaskState(
message.getJob(),
message.getTaskExecutionId(),
message.getCheckpointId(),
message.getSubtaskState());
}
return true;
} else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint "
+ checkpointId);
} else {
reportStats(
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getCheckpointMetrics());
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn(
"Received late message for now expired checkpoint attempt {} from task "
+ "{} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
} else {
LOG.debug(
"Received message for an unknown checkpoint {} from task {} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(
message.getJob(),
message.getTaskExecutionId(),
message.getCheckpointId(),
message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
接下来我们看一下PendingCheckpoint的确认过程。checkpoint.acknowledgeTask:
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
TaskStateSnapshot operatorSubtaskStates,
CheckpointMetrics metrics,
@Nullable PendingCheckpointStats statsCallback) {
synchronized (lock) {
// 如果checkpoint已废弃,返回DISCARDED
if (disposed) {
return TaskAcknowledgeResult.DISCARDED;
}
// 从还没有任务的Tasks集合中移除已确认的task,notYetAcknowledgedTasks保存了所有未确认的task
final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
if (vertex == null) {
// 如果notYetAcknowledgedTasks没有该task,但是它在acknowledgedTasks(已确认的task)集合中
// 返回重复确认DUPLICATE
if (acknowledgedTasks.contains(executionAttemptId)) {
return TaskAcknowledgeResult.DUPLICATE;
} else {
return TaskAcknowledgeResult.UNKNOWN;
}
} else {
// 添加到已确认task集合中
acknowledgedTasks.add(executionAttemptId);
}
List<OperatorIDPair> operatorIDs = vertex.getJobVertex().getOperatorIDs();
long ackTimestamp = System.currentTimeMillis();
for (OperatorIDPair operatorID : operatorIDs) {
if (operatorSubtaskStates != null && operatorSubtaskStates.isFinishedOnRestore()) {
updateFinishedOnRestoreOperatorState(vertex, operatorID);
} else {
updateNonFinishedOnRestoreOperatorState(
vertex, operatorSubtaskStates, operatorID);
}
}
++numAcknowledgedTasks;
// publish the checkpoint statistics
// to prevent null-pointers from concurrent modification, copy reference onto stack
if (statsCallback != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
long checkpointStartDelayMillis =
metrics.getCheckpointStartDelayNanos() / 1_000_000;
SubtaskStateStats subtaskStateStats =
new SubtaskStateStats(
vertex.getParallelSubtaskIndex(),
ackTimestamp,
metrics.getTotalBytesPersisted(),
metrics.getSyncDurationMillis(),
metrics.getAsyncDurationMillis(),
metrics.getBytesProcessedDuringAlignment(),
metrics.getBytesPersistedDuringAlignment(),
alignmentDurationMillis,
checkpointStartDelayMillis,
metrics.getUnalignedCheckpoint(),
true);
LOG.trace(
"Checkpoint {} stats for {}: size={}Kb, duration={}ms, sync part={}ms, async part={}ms",
checkpointId,
vertex.getTaskNameWithSubtaskIndex(),
subtaskStateStats.getStateSize() == 0
? 0
: subtaskStateStats.getStateSize() / 1024,
subtaskStateStats.getEndToEndDuration(statsCallback.getTriggerTimestamp()),
subtaskStateStats.getSyncCheckpointDuration(),
subtaskStateStats.getAsyncCheckpointDuration());
// 汇报所有子任务的状态
statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
}
return TaskAcknowledgeResult.SUCCESS;
}
}
我们再来回过头看一下什么时候会调用completePendingCheckpoint,我们回到completePendingCheckpoint方法,如果确认SUCCESS,则会调用:
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
public boolean isFullyAcknowledged() {
return areTasksFullyAcknowledged()
&& areCoordinatorsFullyAcknowledged()
&& areMasterStatesFullyAcknowledged();
}
boolean areTasksFullyAcknowledged() {
return notYetAcknowledgedTasks.isEmpty() && !disposed;
}
以上可以看到,当任务没有被废弃,且notYetAcknowledgedTasks为空时才会调用completePendingCheckpoint方法,接下来看一下completePendingCheckpoint方法的具体逻辑:
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
throws CheckpointException {
assert Thread.holdsLock(lock);
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// 注册所有operator的state到sharedStateRegistry
// As a first step to complete the checkpoint, we register its state with the registry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
try {
try {
// 调用完成pendingCheckpoint的逻辑
completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint(
checkpointsCleaner,
this::scheduleTriggerRequest,
executor,
getStatsCallback(pendingCheckpoint));
// 重置失败checkpoint的计数
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
} catch (Exception e1) {
// 如果不能complete checkpoint,则终止checkpoint
// abort the current pending checkpoint if we fails to finalize the pending
// checkpoint.
if (!pendingCheckpoint.isDisposed()) {
abortPendingCheckpoint(
pendingCheckpoint,
new CheckpointException(
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
}
throw new CheckpointException(
"Could not finalize the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
e1);
}
// 检查状态,调用finalizeCheckpoint方法后pendingCheckpoint必须为discarded状态
// the pending checkpoint must be discarded after the finalization
Preconditions.checkState(pendingCheckpoint.isDisposed() && completedCheckpoint != null);
try {
// 存储已完成的checkpoint
completedCheckpointStore.addCheckpoint(
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
} catch (Exception exception) {
if (exception instanceof PossibleInconsistentStateException) {
LOG.warn(
"An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.",
completedCheckpoint.getCheckpointID(),
completedCheckpoint.getExternalPointer());
} else {
// we failed to store the completed checkpoint. Let's clean up
checkpointsCleaner.cleanCheckpointOnFailedStoring(
completedCheckpoint, executor);
}
sendAbortedMessages(
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
checkpointId,
pendingCheckpoint.getCheckpointTimestamp());
throw new CheckpointException(
"Could not complete the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
exception);
}
if (checkpointId > latestCompletedCheckpointId) {
latestCompletedCheckpointId = checkpointId;
}
} finally {
// 从正在进行中checkpoint集合中移除此checkpoint
pendingCheckpoints.remove(checkpointId);
scheduleTriggerRequest();
}
// 保存最近的checkpoint ID
rememberRecentCheckpointId(checkpointId);
// drop those pending checkpoints that are at prior to the completed one
// 挂掉所有id小于checkpointId的checkpoint操作, 即drop掉新的checkpoint开始时,上次还未完成的checkpoint
dropSubsumedCheckpoints(checkpointId);
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
LOG.info(
"Completed checkpoint {} for job {} ({} bytes, checkpointDuration={} ms, finalizationTime={} ms).",
checkpointId,
job,
completedCheckpoint.getStateSize(),
completedCheckpoint.getCompletionTimestamp() - completedCheckpoint.getTimestamp(),
System.currentTimeMillis() - completedCheckpoint.getCompletionTimestamp());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
// send the "notify complete" call to all vertices, coordinators, etc.
// 向各个节点发送checkpoint完成的消息,此方法很重要
// 逐级发送通知到所有的task,StreamTask,再到所有的operator,userFunction
// 最后如果userFunction实现了CheckpointListener接口
// 逐个调用这些userFunction的notifyCheckpointComplete方法
sendAcknowledgeMessages(
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
checkpointId,
completedCheckpoint.getTimestamp());
}
最后我们看一下PendingCheckpoint到CompletedCheckpoint的过程:
public CompletedCheckpoint finalizeCheckpoint(
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup,
Executor executor,
@Nullable PendingCheckpointStats statsCallback)
throws IOException {
synchronized (lock) {
checkState(!isDisposed(), "checkpoint is discarded");
// 保证所有的task都确认
checkState(
isFullyAcknowledged(),
"Pending checkpoint has not been fully acknowledged yet");
// make sure we fulfill the promise with an exception if something fails
try {
if (checkpointPlan.isMayHaveFinishedTasks()) {
Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex = new HashMap<>();
for (Execution task : checkpointPlan.getFinishedTasks()) {
JobVertexID jobVertexId = task.getVertex().getJobvertexId();
if (!fullyFinishedOrFinishedOnRestoreVertex.contains(jobVertexId)) {
partlyFinishedVertex.put(jobVertexId, task.getVertex().getJobVertex());
}
}
checkNoPartlyFinishedVertexUsedUnionListState(partlyFinishedVertex);
checkNoPartlyOperatorsFinishedVertexUsedUnionListState(partlyFinishedVertex);
}
fulfillFullyFinishedOperatorStates();
// write out the metadata
// 创建一个检查点对象
final CheckpointMetadata savepoint =
new CheckpointMetadata(checkpointId, operatorStates.values(), masterStates);
final CompletedCheckpointStorageLocation finalizedLocation;
// 保存checkpoint数据到文件系统
try (CheckpointMetadataOutputStream out =
targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
finalizedLocation = out.closeAndFinalizeCheckpoint();
}
CompletedCheckpoint completed =
new CompletedCheckpoint(
jobId,
checkpointId,
checkpointTimestamp,
System.currentTimeMillis(),
operatorStates,
masterStates,
props,
finalizedLocation);
// completableFuture任务完成,返回completedCheckpoint
onCompletionPromise.complete(completed);
// to prevent null-pointers from concurrent modification, copy reference onto stack
if (statsCallback != null) {
LOG.trace(
"Checkpoint {} size: {}Kb, duration: {}ms",
checkpointId,
statsCallback.getStateSize() == 0
? 0
: statsCallback.getStateSize() / 1024,
statsCallback.getEndToEndDuration());
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
CompletedCheckpointStats.DiscardCallback discardCallback =
statsCallback.reportCompletedCheckpoint(
finalizedLocation.getExternalPointer());
completed.setDiscardCallback(discardCallback);
}
// mark this pending checkpoint as disposed, but do NOT drop the state
dispose(false, checkpointsCleaner, postCleanup, executor);
return completed;
} catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
}
文章参考:https://www.jianshu.com/p/e88938b79f91