public class TableSchemaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("logdata")
.startFromEarliest()
.property("bootstrap.servers", "127.0.0.1:9092")
.property("group.id", "server")
).withFormat(
new Json().failOnMissingField(false).deriveSchema()
).withSchema(
new Schema()
/**
* dataset
* docker.container docker.network docker.memory docker.diskio docker.cpu
* system.cpu system.network system.diskio system.process
**/
.field("host",Types.ROW(new String[]{"name"},new TypeInformation[]{Types.STRING()}))
.field("metricset", Types.ROW(new String[]{"name"},new TypeInformation[]{Types.STRING()}) )
.field("event",Types.ROW(new String[]{"dataset"},new TypeInformation[]{Types.STRING()}))
)
.inAppendMode()
.registerTableSource("server");
Table table = tableEnv.sqlQuery("select * from server ");
tableEnv.toAppendStream(table, Row.class).print();
env.execute();
}
}
flink table kafka connect 使用
猜你喜欢
转载自blog.csdn.net/zb313982521/article/details/100513446
今日推荐
周排行