Java教程

FlinkSQL实践记录2

本文主要是介绍FlinkSQL实践记录2,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1. 背景

昨天《FlinkSQL实践记录1》对FlinkSql做了简单的使用insert into .. select ..,今天对聚合运算做一些实践。

2. 代码实践

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        // 插入数据
        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                       // "order by name "
        );
        System.out.println(tableResult.getJobClient().get().getJobStatus());

2.1 mysql表不加primary主键

# 注意需要使用bigint, int类型会报错
create table count_info (
name varchar(100),
cnt bigint ) ;

当上游数据不断产生时,会将实时产生的新结果插入mysql

2.2 mysql表添加primary主键

create table count_info (
name varchar(100),
cnt bigint,
primary key(NAME)
) ;

当上游数据不断产生时,会将实时产生的新结果更新至mysql

新生产一批数据后

3. 遇到的问题及解决办法

3.1 sink table缺失主键

Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

解决办法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

3.2 不能排序

Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

解决办法:去掉order by

        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                      //  "order by name "
        );

4. 不过瘾

接下来对join关联做些实践

这篇关于FlinkSQL实践记录2的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!