增量式Checkpoint实验(碰到bug没有搞定)

根据[1]:

'''这里面的核心就是 checkpoint 机制,Flink 使用 checkpoint 机制来进行状态保证,在 Flink 中 checkpoint 是一个定时触发的全局异步快照,并持久化到持久存储系统上(通常是分布式文件系统)。发生故障后,Flink 选择从最近的一个快照进行恢复。有用户的作业状态达到 GB 甚至 TB 级别,对这么大的作业状态做一次 checkpoint 会非常耗时,耗资源,因此我们在 Flink 1.3 中引入了增量 checkpoint 机制。

在增量 checkpoint 之前,Flink 的每个 checkpoint 都包含作业的所有状态。我们在观察到状态在 checkpoint 之间的变化并没有那么大之后,支持了增量 checkpoint。增量 checkpoint 仅包含上次 checkpoint 和本次 checkpoint 之间状态的差异(也就是“增量”)。'''

"

从 checkpoint 恢复以及性能

开启增量 checkpoint 之后,不需要再进行其他额外的配置。如果 Job 异常,Flink 的 JobMaster 会通知所有 task 从上一个成功的 checkpoint 进行恢复,不管是全量 checkpoint 还是增量 checkpoint。每个 TaskManager 会从持久化存储下载他们需要的状态文件。

尽管增量 checkpoint 能减少大状态下的 checkpoint 时间,但是天下没有免费的午餐,我们需要在其他方面进行舍弃。增量 checkpoint 可以减少 checkpoint 的总时间,但是也可能导致恢复的时候需要更长的时间。如果集群的故障频繁,Flink 的 TaskManager 需要从多个 checkpoint 中下载需要的状态文件(这些文件中包含一些已经被删除的状态),作业恢复的整体时间可能比不使用增量 checkpoint 更长。

另外在增量 checkpoint 情况下,我们不能删除旧 checkpoint 生成的文件,因为新的 checkpoint 会继续引用它们,这可能导致需要更多的存储空间,并且恢复的时候可能消耗更多的带宽。

关于控制便捷性与性能之间平衡的策略可以参考此文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/large_state_tuning.html

"

从上面两段话也可以看出,如果频繁故障的话,可能并不是适合采用增量check point

具体实验步骤如下:

步骤 内容
mvn clean scala:compile compile package

nc -lk 9999

flink run -c wordcount_increstate /home/appleyuchi/桌面/Flink_Code/flink_state/checkpoint/Scala/target/datastream_api-1.0-SNAPSHOT.jar
Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b

连续4次故意输入error来触发job停止(模拟生产环境中的大数据框架崩溃)

before
error
error
error
error
 

选择checkpoints然后复制页面的中间一行

flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount /home/appleyuchi/桌面/Flink_Code/flink_state/checkpoint/Scala/target/datastream_api-1.0-SNAPSHOT.jar

在nc -lk 9999中输入

after

after

after

完整代码如下:
https://paste.ubuntu.com/p/DpTyQKq6Vk/

目前碰到bug:

https://blog.csdn.net/appleyuchi/article/details/108896697

暂时无法解决.

先放着.

Reference:

[1]Apache Flink 管理大型状态之增量 Checkpoint 详解

猜你喜欢

转载自blog.csdn.net/appleyuchi/article/details/108896441