动态表
动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询
- 注意事项
- 动态表首先是一个逻辑概念。在查询执行期间不一定(完全)物化动态表
- 在流上定义的表在内部没有物化
连续查询
一个不会停止的查询,其结果生成一个动态表。查询不断更新结果表,以反映其(动态)输入的数据在表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询。
- 动态表和连续查询的关系。
- 连续查询的分类
- 更新查询: 查询更新先前输出的结果,即定义结果表的 changelog 流包含 INSERT 和 UPDATE 操作
- 追加查询: 查询只附加到结果表,即结果表的 changelog 流只包含 INSERT 操作
- 更新查询
连续查询的任务通常要运行数月的时间,要处理的数据量通常会非常大,如果必须要更新先前的结果的查询,则需要维护的状态会非常大。很可能会导致任务崩溃,所以更新查询要慎用。 - 追加查询
即不需要更新先前查询结果的查询。例如使用窗口统计每小时的用户点击量这样的查询操作。这种查询不需要维护先前查询结果的状态
查询限制
由于有些连续查询要维护的状态时间长,如果不加以控制,很可能会导致因状态过大而导致任务崩溃,为了防止这种情况发生,可能设置状态的清理时间来定时清理过期的状态。通过TableConfig 进行指定。
// 从TableEnvironment获取查询配置
TableConfig tConfig = tableEnv.getConfig();
// 设置查询参数
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24));
第一个参数(Time.hours(12))表示最小空闲状态保持时间,即状态需要保留的最小时间。
第二个参数(Time.hours(24))表示最大空闲状态保持时间,即状态保留的最大时间。
流是如何转化为动态表的
看一下StreamTableEnvironment的fromDataStream()方法是如何转化成Table的。
@Override
public <T> Table fromDataStream(DataStream<T> dataStream) {
JavaDataStreamQueryOperation<T> queryOperation = asQueryOperation(
dataStream,
Optional.empty());
return createTable(queryOperation);
}
首先将dataStream转换成JavaDataStreamQueryOperation。JavaDataStreamQueryOperation实现自QueryOperation。QueryOperation是面向用户的Table Api操作的基类。重点看一下asQueryOperation方法。
private <T> JavaDataStreamQueryOperation<T> asQueryOperation(
DataStream<T> dataStream,
Optional<List<Expression>> fields) {
TypeInformation<T> streamType = dataStream.getType();
// 获取所有非替换字段的字段名称和类型
//asQueryOperation方法对dataStream的字段类型作了映射,转换成JavaDataStreamQueryOperation
FieldInfoUtils.TypeInfoSchema typeInfoSchema = fields.map(f -> {
FieldInfoUtils.TypeInfoSchema fieldsInfo = FieldInfoUtils.getFieldsInfo(
streamType,
f.toArray(new Expression[0]));
// 检查事件时间是否开启
validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
return fieldsInfo;
}).orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType));
return new JavaDataStreamQueryOperation<>(
dataStream,
typeInfoSchema.getIndices(),
typeInfoSchema.toTableSchema());
}
得到JavaDataStreamQueryOperation对象后,将JavaDataStreamQueryOperation对象传给createTable方法,从而得到Table对象,即完成了从流到Table的转换。
protected TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
this,
tableOperation,
operationTreeBuilder,
functionCatalog.asLookup(parser::parseIdentifier));
}
动态表是如何转化为流的
以toRetractStream()为例
@Override
public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo) {
OutputConversionModifyOperation modifyOperation = new OutputConversionModifyOperation(
table.getQueryOperation(),
wrapWithChangeFlag(typeInfo),
OutputConversionModifyOperation.UpdateMode.RETRACT);
return toDataStream(table, modifyOperation);
}
将table首先转换成OutputConversionModifyOperation,OutputConversionModifyOperation为特殊的ModifyOperation类型,它可以将 QueryOperation的树转换为用TypeInformation描述的给定类型的Transformation。 用于将关系查询转换为数据流。(Operation为table操作的基类,Transformation为dataStream操作的基类)
private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) {
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));
Transformation<T> transformation = getTransformation(table, transformations);
executionEnvironment.addOperator(transformation);
return new DataStream<>(executionEnvironment, transformation);
}
然后通过translate方法将operation操作树转换成Transformation集合,最终转换成DataStream。
- 表可转换成的流类型
- Append-only 流:仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
- Retract 流(可撤回流): retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。
- upsert 流:包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。
注意:在将动态表转换为 DataStream 时,只支持 append 流和 retract 流
时态表(Temporal Tables)
首先,时态表也是动态表,时态表的每条记录都必须有一个或多个时间字段想关联。时态表通常在业务处理中作为一个变化的维度表的角色。
注意:仅 Blink planner 支持时态表
时态表分类
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
声明版本表
在 Flink 中,定义了主键约束和事件时间属性的表就是版本表。
-- 定义一张版本表
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product_id) NOT ENFORCED, -- (1) 定义主键约束
WATERMARK FOR update_time AS update_time -- (2) 通过 watermark 定义事件时间
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
注意: METADATA FROM ‘value.source.timestamp’ VIRTUAL 语法的意思是从每条 changelog 中抽取 changelog 对应的数据库表中操作的执行时间,强烈推荐使用数据库表中操作的 执行时间作为事件时间 ,否则通过时间抽取的版本可能和数据库中的版本不匹配。
声明版本视图
Flink 也支持定义版本视图只要一个视图包含主键和事件时间便是一个版本视图。
-- 定义一张 append-only 表
CREATE TABLE RatesHistory (
currency_time TIMESTAMP(3),
currency STRING,
rate DECIMAL(38, 10),
WATERMARK FOR currency_time AS currency_time -- 定义事件时间
) WITH (
'connector' = 'kafka',
'topic' = 'rates',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json' -- 普通的 append-only 流
)
定义版本视图
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件时间
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键
ORDER BY currency_time DESC) AS rowNum
FROM RatesHistory )
WHERE rowNum = 1;
注意:通过 LookupableTableSource 定义的表意味着该表具备了在运行时通过一个或多个 key 去查询外部存储系统的能力,当前支持在 基于处理时间的时态表 join 中使用的表包括 JDBC, HBase 和 Hive
时态表函数
时态表函数是一种过时的方式去定义时态表并关联时态表的数据,1.12版本之前用时态表函数去实现时态表的语义,现在我们可以用时态表 DDL 去定义时态表,用时态表 Join 语法去关联时态表。
注册创建时态表函数:
/ 用上面的数据集创建并注册一个示例表
// 在实际设置中,应使用自己的表替换它
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());
tEnv.createTemporaryView("RatesHistory", ratesHistory);
// 创建和注册时态表函数
// 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates);