目录
Min Write Buffer Number To Merge
1 什么是状态后端?
指定状态存储的方式和位置
2 状态后端分类?
MemoryStateBckend
运行时需要的State数据保存在TaskManager JVM 堆上内存中
checkpoint保存在JobManager进程的内存中、可选择同步/异步快照
注意
1)State存储在JobManager的内存中,受限于JobManager的内存大小
2)每个State默认5MB,可通过MemoryStateBackend构造函数调整
3)每个State不能超过Akka Frame 大小
FsStateBackend
运行时需要的State数据保存在TaskManager JVM 堆上内存中
checkpoint存储在文件系统中(HDFS)
运行时需要的State数据保存在TaskManager JVM 堆上内存中
适用场景
1)大状态、长窗口,或大键值状态的有状态处理任务
2)高可用
注意点
1)State数据首先会被存在TaskManager的内存中
2)State大小不能超过TM内存
3)TM异步将State数据写入外部存储
RocksDBStateBackend
使用嵌入式的本地数据库RocksDB将流计算数据状态存储在本地磁盘中,不会受限于TaskManager的内存大小
在执行检查点的时候,再将整个RocksDB中保存的State数据全量或者增量持久化到配置的文件系统中
在JobManager内存中会存储少量的检查点元数据
RocksDB克服了State受内存限制的问题,同时又能持久化到远端文件系统中,比较适合生产中使用
适用场景
1)大状态、长窗口,或大键值状态的有状态处理任务
2)高可用
3)RocksDBStateBackend支持增量检查点。增量检查点非常适用于超大状态的场景
注意点
1)总State大小仅限于磁盘大小,不受内存限制
2)RocksDBStateBackend也需要配置外部文件系统,集中保存State
3 RocksDB 大状态调优
对于需要保存超大状态(远超于内存容量)的流计算场景来说,目前 RocksDB 是 Flink 平台上官方实现的唯一选择。业界也有使用 Redis 等其他服务作为状态后端的方案,但终究不够成熟,且已被社区否决
RocksDB是基于LSM树原理实现的KV数据库,LSM树读放大问题比较严重,因此对磁盘性能要求比较高,强烈建议生产环境使用SSD作为RocksDB的存储介质。但是 有些集群可能没有配置SSD,仅仅是普通的机械硬盘,当Flink任务比较大,且对状态访问比较频繁时,机械硬盘的磁盘IO可能成为性能瓶颈。在这种情况下,该如何解决此瓶颈呢?
使用多块硬盘来分担压力
RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。
当一个TaskManager包含3个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘io,这样必定导致三个并行度的吞吐量都会下降。
庆幸的是Flink的state.backend.rocksdb.localdir 参数可以指定多个目录,一般大数据服务器都会挂载很多块硬盘,我们期望同一个TaskManager的三个slot使用不同的硬盘从而减少资源竞争。具体如下配置
-
设置本地Rocks多目录
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb
注意:务必将目录配置到多块不同的磁盘上,不要配置单块磁盘的多个目录,这里配置多个目录是为了让多块磁盘来分担压力
如下所示是测试过程中磁盘的 IO 使用率,可以看出三个大状态算子的并行度分别对应了三块磁盘,这三块磁盘的 IO 平均使用率都保持在 45% 左右,IO 最高使用率几乎都是 100%,而其他磁盘的 IO 平均使用率相对低很多。由此可见使用 RocksDB 做为状态后端且有大状态的频繁读取时, 对磁盘IO性能消耗确实比较大。
-
开启增量Checkpoint
state.backend.incremental 开启增量检查点,默认false。或代码中指定new EmbededRocksDBStateBackend(true)
-
Block Size
state.backend.rocksdb.block.blocksize 默认值为4KB,建议生产环境调整到16~32KB,如果需要增加Block Size的大小来提升读写性能,请务必一并增加Block Cache Size,这样才能可以取得比较好的读写性能。如果内存已经吃紧,那么不建议继续增加Block Cache Size,否则会有OOM的风险(如果在容器环境下限定了使用的内存总量的话,会更明显),那相对来说,也就不建议继续增加Block Size 了,可以适当减少Block Size 来提升吞吐量,同时关注写性能是否可以接收。
-
Block Cache Size
state.backend.rocksdb.block.cache-size 整个RocksDB共享一个block cache读数据时内存的cache大小,该参数越大读数据时缓存命中率越高,默认大小为8MB,但是通常在内存富裕的情况下,建议设置到64~256MB
可以启动state.backend.rocksdb.metrics.metrics.block-cache-usage 监控指标,以实时观察Block Cache的用量情况,并作出针对性的优化
-
最大打开文件
state.backend.rocksdb.files.open 这个参数决定了RocksSB可以打开最大文件句柄数,默认值是5000,建议改为-1(无限制),这个参数如果过小,就会出现索引和过滤器Block无法载入内存的问题,导致读取性能大幅下滑,搭配cache_index_and_filter_blocks为false。
-
Cache Index And Filter Blocks
cache_index_and_filter_blocks(blockBasedTableConfig.setCacheIndexAndFilterBlocks),默认是false,标识不在内存里缓存索引和过滤器Block,而是用到了载入,不用就踢出去。如果设置为true,则表示允许这些索引和过滤器放到Block Cache中备用,这样可以提高局部数据存取的效率(无需磁盘访问就知道key在不在,以及在哪里)。但是,如果启用了这个选项,必须同时把pin_I0_filter_and_index_blocks_in_cache(blockBasedTableConfig.pinL0FilterAndIndexBlocksinCache)参数也设置为true,否则可能会因为操作系统的换页操作,导致性能抖动。
需要注意的是,对于此参数,一定要注意Block Cache的总大小有限,如果允许Index和Filter也放进去,那么用来存放数据的空间就少了。因此,建议在Key具有局部热点(某些Key频繁访问,而其他的Key访问的很少)的情况下,才打开这两个参数;对于分布比较随机的Key,这个参数甚至会起到反作用(随机Key时,读取性能大幅下降)
-
Optimize Filter For Hits
optimize_filters_for_hits这个参数(columnFamilyOptions.setOptimizeFiltersForHits)如果设置为true,则RocksDB不会给L0生成Bloom Filter,据文档中描述,可以减少90%的Filter存储开销,有利于减少内存占用。但是这个参数也仅仅适用于具有局部热点或者确信基本不会出现Cache Miss的场景,否则频繁的找不到,会拖累读取性能。
对于Cache和Filter等内存用量,可以通过Flink的state.backend.rocksdb.metrics.estimate-table-readers-mem 监控指标来估计
-
Write Buffer Size
state.backend.rocksdb.writebuffer.size 默认大小是64MB,通常来说Writer Buffer越大,写放大效应越小,因而写性能也会改善
-
Max Bytes For Level Base
state.backend.rocksdb.compaction.level.max-size-level-base.需要特别注意的是,如果增加Write Buffer Size,请一定要增加L1层的大小阈值(max_bytes_for_level_base),这个因子影响非常大。如果这个参数太小,那么每层都存放的SST文件就很少,层级就会变得很多,造成查找困难;如果这个参数太大,则会造成每层文件太多,那么执行Compaction等操作的耗时就会变得很长,此时容易出现Write Stall(写停止)现象,造成写入中断。
-
Write Buffer Count
Flink 的 state.backend.rocksdb.writebuffer.count 参数(也可以通过 columnFamilyOptions.setMaxWriteBufferNumber 设置)可以控制内存中允许保留的 MemTable 最大个数,超过这个个数后,就会被 Flush 刷写到磁盘上成为 SST 文件。
这个参数的默认值是 2,对于机械磁盘来说,如果内存足够大,可以调大到 5 左右,以令 MemTable 的大小减小一些,降低 Flush 操作时造成 Write Stall 的概率。
-
Min Write Buffer Number To Merge
Flink 的 state.backend.rocksdb.writebuffer.number-to-merge 参数(columnFamilyOptions.setMinWriteBufferNumberToMerge)决定了 Write Buffer 合并的最小阈值,默认值为 1,对于机械硬盘来说可以适当调大,避免频繁的 Merge 操作造成的写停顿。
根据我们的调优经验来看,这个参数调小、调大都会造成性能下滑,它的最佳值会在某个中间值附近,例如 3 等。
对于 MemTable 所占用的内存大小估算指标,可以启用 Flink 的 state.backend.rocksdb.metrics.cur-size-all-mem-tables 参数来实时监控。
-
Thread Num(Parallelism)
state.backend.rocksdb.thread.num 这个参数允许用户增加最大的后台 Compaction 和 Flush 操作的线程数,Flush 操作默认在高优先级队列,Compaction 操作则在低优先级队列。
默认的后台线程数为 1,机械硬盘用户可以改为 4 等更大的值。
如果后台所有的线程都在做 Compaction 操作时,如果这时候突然有很多写请求,就会引发写停顿(Write Stall)。写停顿可以通过日志或者监控指标来发现。
-
Write Batch Size
state.backend.rocksdb.write-batch-size 参数允许指定 RocksDB 批量写入时占用的最大内存量,默认为 2m,如果设置为 0 的话就会自动根据任务量进行调整。这个参数如果没有特别的需求,可以不调整。
-
Compaction Style
state.backend.rocksdb.compaction.style 参数(ColumnFamilyOptions 的 setCompactionStyle 方法)允许用户调整 Compaction 的组织方式,默认值是 LEVEL(较为均衡),但也可以改为 UNIVERSAL 或 FIFO.
相对于默认的 LEVEL 方式,UNIVERSAL 属于 Tiered 的一种,可以减少写放大效应,但是副作用是会增加空间放大和读放大效应,因此只适合写入量较大而读取不频繁,同时磁盘空间足够的场景。
FIFO 则适合于将 RocksDB 作为时序数据库的场景,因为它是先入先出算法,可以批量淘汰掉过期的旧数据。
-
Compression Type
ColumnFamilyOptions 类提供了 setCompressionType 方法,可以指定对 Block 的压缩算法。
RocksDB 提供了无压缩、Snappy、ZLib、BZlib2、LZ4、LZ4HC、Xpress、ZSTD 等多种压缩算法支持,但是需要注意的是,启用前需要确认系统中是否已经装好了对应的压缩算法库,否则可能无法正常运行。
如果追求性能,可以关闭压缩(NO_COMPRESSION);否则建议使用 LZ4 算法,其次是 Snappy 算法。启用压缩后,ReadOptions 的 verify_checksums 选项可以关闭,以提升读取速度(但是可能会受到磁盘坏块的影响)