flink状态实现分析
state
* State
* |
* +-------------------InternalKvState
* | |
* MergingState |
* | |
* +-----------------InternalMergingState
* | |
* +--------+------+ |
* | | |
* ReducingState ListState +-----+-----------------+
* | | | |
* +-----------+ +----------- -----------------InternalListState
* | |
* +---------InternalReducingState
MemoryState
RocksDBState
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>> {
private TypeSerializer<UK> userKeySerializer;
private TypeSerializer<UV> userValueSerializer;
private RocksDBMapState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
Map<UK, UV> defaultValue,
RocksDBKeyedStateBackend<K> backend);
public TypeSerializer<K> getKeySerializer();
public TypeSerializer<N> getNamespaceSerializer();
public TypeSerializer<Map<UK, UV>> getValueSerializer();
public UV get(UK userKey){
//直接读rocksdb
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null
? null
: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
}
public void put(UK userKey, UV userValue){
//直接写rocksdb
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); //backend.db是RocksDBKeyedStateBackend
}
public void putAll(Map<UK, UV> map);
public void remove(UK userKey);
public boolean contains(UK userKey);
public Iterable<Map.Entry<UK, UV>> entries();
public Iterable<UK> keys();
public Iterable<UV> values();
public boolean isEmpty();
public void clear();
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
registerResult,
RocksDBKeyedStateBackend<K> backend) {
//backend在这里传入
return (IS)
new RocksDBMapState<>(
registerResult.f0,
registerResult.f1.getNamespaceSerializer(),
(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
(Map<UK, UV>) stateDesc.getDefaultValue(),
backend);
}
}
backend与checkpoint
public interface Snapshotable<S extends StateObject> {
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception;
}
FSBackend
- FsStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackend
- FsStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- DefaultOperatorStateBackend创建了PartitionableListState, 是State的子类
public interface StateBackend extends java.io.Serializable {
default String getName() {
return this.getClass().getSimpleName();
}
<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws Exception;
OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws Exception;
/** Whether the state backend uses Flink's managed memory. */
default boolean useManagedMemory() {
return false;
}
}
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
checkNotNull(jobId, "jobId");
return new FsCheckpointStorageAccess(
getCheckpointPath(),
getSavepointPath(),
jobId,
getMinFileSizeThreshold(),
getWriteBufferSize());
}
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
LatencyTrackingStateConfig latencyTrackingStateConfig =
latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();
return new HeapKeyedStateBackendBuilder<>( //这里是HeapKeyedStateBackendBuilder
kvStateRegistry,
keySerializer,
env.getUserCodeClassLoader().asClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
latencyTrackingStateConfig,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
localRecoveryConfig,
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry)
.build();
}
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws BackendBuildingException {
return new DefaultOperatorStateBackendBuilder( //这里是DefaultOperatorStateBackendBuilder
env.getUserCodeClassLoader().asClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry)
.build();
}
}
memory backend
- MemoryStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- MemoryStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackendBackend
- 最终调用了HeapMapState::Create创建state
flink checkpoint
参考资料
https://www.jianshu.com/p/569a7e67c1b3
https://blog.csdn.net/u010942041/article/details/114944767
https://cloud.tencent.com/developer/article/1792720
https://blog.51cto.com/dataclub/5351042
https://www.cnblogs.com/lighten/p/13234350.html
https://cloud.tencent.com/developer/article/1765572
https://blog.csdn.net/m0_63475429/article/details/127417649
https://blog.csdn.net/Direction_Wind/article/details/125646616