这里说下这个时间时间的取值,本来我kafka的数据是clickhouse 查询时间特意处理成时间戳。然后使用 TO_TIMESTAMP(date_time) 来设置watermark。 阿里云官网 blink 是支持的,但是这个实际中却不支持。
真的有点狗了。。。。
解决办法如下写法。
public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE tableName (\n" + "`date_time` BIGINT ,\n" + "`hs_security_id` VARCHAR ,\n" + "`security_id` VARCHAR ,\n" + "`pre_close_px` DECIMAL,\n" + "`open_px` DECIMAL,\n" + "`high_px` DECIMAL ,\n" + "`low_px` DECIMAL,\n" + "`last_px` DECIMAL,\n" + "`num_trades` DECIMAL,\n" + "`volume` BIGINT,\n" + "`amount` DECIMAL,\n" + "`phase_code` BIGINT,\n" + "bid_price VARCHAR,\n" + "bid_qty VARCHAR,\n" + "offer_price VARCHAR,\n" + "offer_qty VARCHAR,\n" + "ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," + " WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" + ")WITH (\n" + " 'connector' = 'kafka', \n" + " 'topic'='xxx',\n" + " 'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + "'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'" + ")";
public class OfflineDataAggregationTableApi implements Serializable { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT; // String sinkDDL = CustomTable.SNAPSHOT_PRINT; //注册source和sink tableEnv.executeSql(sourceDDL); // tableEnv.executeSql(sinkDDL); Table sourceTable = tableEnv.from("snapshot"); Table timeTable = tableEnv.sqlQuery("select \n" + "TUMBLE_START(ts, INTERVAL '15' SECOND), \n" + " hs_security_id,\n" + " security_id,\n" + " MAX(pre_close_px) as pre_close_px, \n" + " MAX(open_px) as open_px, \n" + " MAX(high_px) as high_px, \n" + " FIRST_VALUE(phase_code) as phase_code, \n" + " FIRST_VALUE(bid_price) as bid_price, \n" + " FIRST_VALUE(bid_qty) as bid_qty, \n" + " FIRST_VALUE(offer_price) as offer_price, \n" + " FIRST_VALUE(offer_qty) as offer_qty \n" + " from " + sourceTable + " group by TUMBLE(ts, INTERVAL '15' SECOND),hs_security_id,security_id"); TableResult tableResult = tableEnv.executeSql(" select * from " + timeTable); tableResult.print(); env.execute("快照数据读取"); } }