前言
说下数据的特性,我遇到数据是 历史数据,在clickhouse中,数据只有交易日有数据,且交易中只有 早上9点到下午钱有数据。行情快照 间隔是 5秒一笔,但是也会出现间隔小于5秒出现一笔。使用datastream api 处理数据,开窗2分钟,但是数据间隔会出现 回不均匀。真的很苦恼啊。。。
但是使用table api 解决了。
使用Table Api 处理数据
1、数据源表
public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE snapshot(\n" +
"`date_time` VARCHAR ,\n" +
"`hs_security_id` VARCHAR ,\n" +
"`pre_closepx` DECIMAL,\n" +
"`open_px` DECIMAL,\n" +
"`amount` BIGINT,\n" +
"`phase_code` BIGINT,\n" +
"bid_price VARCHAR,\n" +
"bid_qty VARCHAR,\n" +
"offer_price VARCHAR,\n" +
"offer_qty VARCHAR,\n" +
// "ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
"ts AS TO_TIMESTAMP(DATE_FORMAT(date_time,'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND \n" +
")WITH (\n" +
" 'connector' = 'kafka', \n" +
" 'topic'='ck.snapshot',\n" +
" 'properties.bootstrap.servers' = 'ip:9092', \n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
"'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'" +
")";
2、处理逻辑
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT;
String sinkDDL = CustomTable.SNAPSHOT_PRINT;
//注册source和sink
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
Table sourceTable = tableEnv.from("snapshot");
Table timeTable = tableEnv.sqlQuery("select \n" +
"TUMBLE_START(ts, INTERVAL '30' SECOND) as begin_ts, \n" +
" hs_security_id,\n" +
" security_id as security_id,\n" +
" FIRST_VALUE(pre_closepx) as pre_closepx, \n" +
" FIRST_VALUE(open_px) as open_px, \n" +
" FIRST_VALUE(high_px) as high_px, \n" +
" FIRST_VALUE(low_px) as low_px, \n" +
" MAX(last_px) as last_px, \n" +
" MAX(num_trades) as num_trades, \n" +
" MAX(volume) as volume, \n" +
" MAX(amount) as amount, \n" +
" FIRST_VALUE(phase_code) as phase_code, \n" +
" FIRST_VALUE(bid_price) as bid_price, \n" +
" FIRST_VALUE(bid_qty) as bid_qty, \n" +
" FIRST_VALUE(offer_price) as offer_price, \n" +
" FIRST_VALUE(offer_qty) as offer_qty \n" +
" from " +
sourceTable
+ " group by TUMBLE(ts, INTERVAL '30' SECOND),hs_security_id,security_id");
Table formatTable = tableEnv.sqlQuery("select DATE_FORMAT(begin_ts,'yyyy-MM-dd HH:mm:ss') as begin_ts,hs_security_id,security_id,pre_closepx,open_px,high_px,low_px," +
"last_px,num_trades,volume,amount,phase_code,bid_price,bid_qty,offer_price,offer_qty from " + timeTable);
// 临时表转为数据流
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(formatTable, Row.class);