昨天《FlinkSQL实践记录1》对FlinkSql做了简单的使用insert into .. select ..
,今天对聚合运算做一些实践。
String mysql_sql = "CREATE TABLE mysql_sink (" + " name STRING," + " cnt BIGINT," + " PRIMARY KEY (name) NOT ENFORCED" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," + " 'table-name' = 'count_info'," + " 'username' = 'kafka'," + " 'password' = 'Bonc@123'" + ")"; tableEnv.executeSql(mysql_sql); // 插入数据 TableResult tableResult = tableEnv.executeSql( "INSERT INTO mysql_sink " + "SELECT name, count(*) as cnt " + "FROM sensor_source " + "where id > 3 " + "group by name " // "order by name " ); System.out.println(tableResult.getJobClient().get().getJobStatus());
# 注意需要使用bigint, int类型会报错 create table count_info ( name varchar(100), cnt bigint ) ;
当上游数据不断产生时,会将实时产生的新结果插入mysql
create table count_info ( name varchar(100), cnt bigint, primary key(NAME) ) ;
当上游数据不断产生时,会将实时产生的新结果更新至mysql
新生产一批数据后
Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.
解决办法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED
String mysql_sql = "CREATE TABLE mysql_sink (" + " name STRING," + " cnt BIGINT," + " PRIMARY KEY (name) NOT ENFORCED" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," + " 'table-name' = 'count_info'," + " 'username' = 'kafka'," + " 'password' = 'Bonc@123'" + ")";
Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
解决办法:去掉order by
TableResult tableResult = tableEnv.executeSql( "INSERT INTO mysql_sink " + "SELECT name, count(*) as cnt " + "FROM sensor_source " + "where id > 3 " + "group by name " // "order by name " );
接下来对join关联做些实践