将 DataStream 或 DataSet 注册为 Table
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
//将 DataStream 注册为 myTable 表
tableEnv.registerDataStream("myTable", stream);
//将 DataStream 注册为 myTable2 表(表中的字段为 myLong、myString)
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
将 DataStream 或 DataSet 转换为 Table
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
//将 DataStream 转换成 Table
Table table1 = tableEnv.fromDataStream(stream);
//将 DataStream 转换成 Table
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
将 Table 转换成 DataStream
StreamTableEnvironment tableEnv = ...;
//有两个字段(name、age) 的 Table
Table table = ...
//通过指定类,将表转换为一个 append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
//将表转换为 Tuple2<String, Integer> 的 append DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<> (Types.STRING(), Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
//将表转换为一个 Retract DataStream Row
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
将 Table 转换成 DataSet
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
//有两个字段(name、age) 的 Table
Table table = ...
//通过指定一个类将表转换为一个 Row DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
//将表转换为 Tuple2<String, Integer> 的 DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<> (Types.STRING(), Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
SQL Connector
使用代码
tableEnvironment
//声明要连接的外部系统
.connect(
new Kafka() .version("0.10") .topic("goog") .startFromEarliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")
)
//定义数据格式
.withFormat(
new Avro() .avroSchema( "{" + " \"namespace\": \"com.goog\"," + " \"type\": \"record\"," + " \"name\": \"UserMessage\"," + " \"fields\": [" + " {\"name\": \"timestamp\", \"type\": \"string\"}," + " {\"name\": \"user\", \"type\": \"long\"}," + " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" + " ]" + "}"
)
)
//定义
Table schema .withSchema(
new Schema() .field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime() .timestampsFromField("timestamp") .watermarksPeriodicBounded(60000) ) .field("user", Types.LONG) .field("message", Types.STRING)
) .inAppendMode()
//指定流表的 update-mode
.registerTableSource("simon"); //注册表的名字
使用 YAML 文件
使用 DDL
SQL Client
./bin/sql-client.sh embedded