在FlinkSQL关联时,必然会涉及到维表,维表又可能是不断变化的(aka 时态表 或 版本表)。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。 普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
未实战测试
将某一个主题设置为compacted topic
bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact
tableEnv.executeSql( "CREATE TABLE dim_source (" + " id STRING," + " name STRING," + " update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " + " WATERMARK FOR update_time AS update_time, " + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'upsert-kafka'," + " 'topic' = 'flinksqldim'," + " 'properties.bootstrap.servers' = 'ip:port'," + " 'properties.group.id' = 'flinksqlDim'," + " 'key.format' = 'json'," + " 'value.format' = 'json')" );
对应的kafka topic生产者代码
// 创建消息 // DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn"); for (int i = 1; i < 5; i++) { JSONObject json1 = new JSONObject(); json1.put("key", i+""); //json.put("update_time", dtf.format(LocalDateTime.now())); JSONObject json = new JSONObject(); json.put("id", i+""); json.put("name", "name222"+i); ProducerRecord<String, String> record = new ProducerRecord<String, String>( "flinksqldim", json1.toJSONString(), json.toJSONString() ); // 发送消息 Future<RecordMetadata> future = producer.send(record); }
kafka支持的METADATA类型
key.format
必须指定key.format
,可以不指定'scan.startup.mode' = 'earliest-offset'
Flink-1.12 - 之Flink SQL 与 kafka connector实践
Flink动态表和时态表总结
基于 FLINK SQL 的实时数据打宽的三种方式
将某一个主题设置为compacted topic
kafka支持的METADATA类型