前言
我是使用flink table api 出现的错误,我使用的版本是 1.13.1.
这个错出现真的很奇怪,阿里云官网是支持的,blink 的 时间函数已经说明
TO_TIMESTAMP 这个支持 时间戳(毫秒),或者字符串。
但实际中却不可以用。真的很。。。
我的sql 写法:
data_time 是 13位时间戳,但是不可以。
public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE tableName (\n" +
"`date_time` BIGINT ,\n" +
"`hs_security_id` VARCHAR ,\n" +
"`security_id` VARCHAR ,\n" +
"`num_trades` DECIMAL,\n" +
"`volume` BIGINT,\n" +
"`amount` DECIMAL,\n" +
"`phase_code` BIGINT,\n" +
"bid_price VARCHAR,\n" +
"bid_qty VARCHAR,\n" +
"ts AS TO_TIMESTAMP(date_time)," +
" WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
")WITH (\n" +
" 'connector' = 'kafka', \n" +
" 'topic'='xxx',\n" +
" 'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
"'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'" +
")";
报错信息如下:
Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 18, column 7 to line 18, column 29: Cannot apply 'TO_TIMESTAMP' to arguments of type 'TO_TIMESTAMP(<BIGINT>)'. Supported form(s): 'TO_TIMESTAMP(<CHARACTER>)'
'TO_TIMESTAMP(<CHARACTER>, <CHARACTER>)'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:106)
at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:140)
at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:256)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:956)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:941)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.appendDerivedColumns(MergeTableLikeUtil.java:434)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.access$000(MergeTableLikeUtil.java:209)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:150)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:117)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:70)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
解决办法
对与 13位时间戳要这样
SELECT TO_TIMESTAMP(FROM_UNIXTIME(1513135677000 / 1000, 'yyyy-MM-dd HH:mm:ss'))
SQL Query Result (Table) 2017-12-13T11:27:57
我又测试字符串时间 发现 TO_TIMESTAMP可以支持CHARACTER类型表示的datetime数据例如2017-12-13 11:27:57。
SELECT TO_TIMESTAMP('2017-12-13 11:27:57', 'yyyy-MM-dd HH:mm:ss'))
SQL Query Result (Table) 2017-12-13T11:27:57