先上代码:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(5) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) val hive_catalog = new HiveCatalog( "flink", // catalog name "default", // default database "G:\\Flink SQL开发文件", // Hive config (hive-site.xml) directory "1.1.0" // Hive version )//todo kafka 数据源创建表 val createSourceTableSql = """CREATE TABLE flink_test_01 ( | user_id STRING, | item_id STRING, | category_id STRING, | behavior STRING, | ts TIMESTAMP(3), | proctime as proctime(), | WATERMARK FOR ts as ts - INTERVAL '5' SECOND | ) | WITH |( | 'connector.type' = 'kafka', | 'connector.version' = '0.11', | 'connector.topic' = 'ng_log_par_extracted', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.zookeeper.connect' = 'dev-ct6-dc-worker01:2181,dev-ct6-dc-worker02:2181,dev-ct6-dc-worker03:2181', | 'connector.properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092', | 'connector.properties.group.id' = 'test_group', | 'format.type' = 'json', | 'format.derive-schema' = 'true', | 'update-mode' = 'append' |)""".stripMargin print(createSourceTableSql) tableEnv.sqlUpdate(createSourceTableSql)
我们通过API 获取了kafka的数据转出了table,这个时候我们查询这个表,正常我们是可以查询字段‘proctime ’,可是这里会报错
这个时候去掉hivecata的话,这个字段本身是可以查询的。
后面发现这是官网的一个BUG :
这个是一个已知bug,https://issues.apache.org/jira/browse/FLINK-17189,已经fix了
看来任重而道远。。