FlinkSQL分为Table API和SQL API,是架构于Flink Core之上用SQL予以方便快捷地进行结构化数据处理的上层库。
SQL和Table在进入Flink以后转化成统一的数据结构表达形式,也就是逻辑计划(logic plan),其中catalog提供元数据信息,用于后续的优化,逻辑计划是优化的入门,经过一系列规则后,Flink把初始的逻辑计划优化为物理计划(phy plan),物理计划通过代码构造器翻译为Transformation,最后转换为工作图(job graph)。
整个过程没有单独的流处理和批处理,因为流处理和批处理优化过程和扩建都是共享的。
创建Flink SQL运行环境。
将数据源定义成表。
执行SQL语义查询。
将查询结果输出到目标表中。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink.version>1.15.2</flink.version> <scala.version>2.12.2</scala.version> <log4j.version>2.12.1</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <!--flink客户端--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!--本地运行的webUI--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency> <!--flink与kafka整合--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--状态后端--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> </dependency> <!--日志系统--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.21</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep</artifactId> <version>${flink.version}</version> </dependency> <!--json格式依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!--csv格式依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink SQL --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink CDC 的依赖 --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <!-- flink与File整合的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink On Hive--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> <exclusions> <exclusion> <groupId>org.apache.calcite.avatica</groupId> <artifactId>avatica</artifactId> </exclusion> <exclusion> <groupId>org.apache.calcite</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.41</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <version>1.21</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <version>1.21</version> </dependency> </dependencies>
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20} {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30} {"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":351619200000,"sal":1250.0,"comm":500.0,"deptno":30} {"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":354988800000,"sal":2975.0,"comm":null,"deptno":20} {"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":370454400000,"sal":1250.0,"comm":1400.0,"deptno":30} {"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":357494400000,"sal":2850.0,"comm":null,"deptno":30} {"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":360864000000,"sal":2450.0,"comm":null,"deptno":10} {"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":553100400000,"sal":3000.0,"comm":null,"deptno":20} {"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":374774400000,"sal":5000.0,"comm":null,"deptno":10} {"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":368726400000,"sal":1500.0,"comm":0.0,"deptno":30} {"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":553100400000,"sal":1100.0,"comm":null,"deptno":20} {"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":376156800000,"sal":950.0,"comm":null,"deptno":30} {"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":376156800000,"sal":3000.0,"comm":null,"deptno":20} {"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":380563200000,"sal":1300.0,"comm":null,"deptno":10}
public static void main(String[] args) throws Exception { //快速入门 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); //读取文本文件数据转为Table 对象 DataStream<Emp> source = environment.readTextFile("data/emp.txt") .map(lines ->JSONObject.parseObject(lines, Emp.class)); //把JAVA对象转为table对象 //注意Emp对象中hiredate时间戳是Long类型 // {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30} Table table = tableEnv.fromDataStream(source); table.select(Expressions.$("*")).execute().print(); }
TableEnvironment是Table API和SQL的核心概念:
- 内部catalog中注册Table
- 注册外部的catalog
- 加载可插拔模式
- 执行SQL查询
- 注册自定义函数(scalar table aggregation)
- DataStream和Table之间的转换
Table与特定的TableEnvironment绑定,不能在同一条查询中使用不同的TableEnvironment中的表。
输入源流式还是批式,Table API和SQL查询都会转换成DataStream程序。
Table对象的标识位:CataLog.DB.Table
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
EnvironmentSettings build = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment TabEnv = TableEnvironment.create(build);
标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。
如果catalog或者数据库没有指明,就会使用当前默认值。
Table可以是虚拟的(视图views)也可以是常规的表Tables,其中视图是临时的存储在内存中,会话结束临时表就消失,而tables表示永久化保存的外部数据物理表。
表分类:临时表(仅存在flink会话中)永久表(元数据保存在catalog中)屏蔽特性(临时表与永久表同名,临时表存在永久表就无法访问,删除临时表就可以访问永久表)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment); Table table = tabEnv.fromDataStream(source); //Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查询指定列数据并设置别名。 tabEnv.createTemporaryView("t_emp",table); tabEnv.sqlQuery("select * from t_emp").execute().print();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment); //设置别名并查询指定列数据 Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));
创建临时视图(临时表),第一个参数是注册的表名([catalog.db.]tableName),第二个参数可以是Tabe对象也可以是DataStream对象,第三个参数是指定的列字段名(可选)。
Table table = tabEnv.fromDataStream(source); //Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查询指定列数据并设置别名。 tabEnv.createTemporaryView("t_emp",table); ========================================================================================= DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment); //设置别名 并指定查询的列数据 tabEnv.createTemporaryView("t_emp",source,$("deptno").as("dd")); tabEnv.sqlQuery("select * from t_emp").execute().print();
- 原子类型:DataStream中支持的数据类型,Table也是支持的,也就是基本数据类和通用类型(Integer、Double、String等)
- Tuple类型:从f0开始计数,f0 f1 f2,所有字段都可以被重新排序,也可以提前一部分字段。
- Pojo类型:Flink 也支持多种数据类型组合成的“复合类型”,最典型的就是简单 Java 对象(POJO 类型)。将 POJO 类型的 DataStream 转换成 Table,如果不指定字段名称,就会直接使用原始 POJO 类型 中的字段名称。Pojo字段可以被重新排序、提取和重命名。
- Row类型:Flink 中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是 Table 中数据的基 本组织形式。长度固定,无法推断出每个字段的类型,在使用时必须声明具体的类型信息。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.readTextFile("data/dept.txt"); //所谓的字段重新排序就是查询出来的指定字段顺序可以自定义 StreamTableEnvironment.create(environment).fromDataStream(source,$("f1")).execute().print();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment); DataStreamSource<Row> source = environment.fromElements(Row.ofKind(RowKind.INSERT, "张三", 20) , Row.ofKind(RowKind.INSERT, "李四", 25) //RowKind.UPDATE_BEFORE 打标记的作用 , Row.ofKind(RowKind.UPDATE_BEFORE, "yy", 12) , Row.ofKind(RowKind.UPDATE_AFTER, "aaa", 18)); Table table = tabEnv.fromChangelogStream(source); table.execute().print();
Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反,Table API 的查询不是由字符串指定,而是在宿主语言中逐步构建。
table.groupBy(...).select() ,其中 groupBy(...) 指定 table 的分组,而 select(...) 在 table 分组上的投影
//{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20} StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); Table table = StreamTableEnvironment.create(environment).fromDataStream(source); table.where($("deptno").isEqual(10)).select($("ename"), $("job")).execute().print(); table.groupBy($("deptno")).select($("deptno"),$("sal").avg().as("sal_avg")).execute().print();
StreamTableEnvironment对象有两个常用的方法:sqlQuery()和executeSql()两个方法。
- sqlQuery()主要用于查询数据,并且可以查询混用。
- executeSql()可以用来增删改查数据都可以。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); StreamTableEnvironment tbl = StreamTableEnvironment.create(environment); tbl.createTemporaryView("t_emp_demo",source); String sql="select deptno,avg(sal) " + " from t_emp_demo " + " group by deptno "; tbl.executeSql(sql).print(); ========================================================================================= StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); Table empTable = tableEnvironment.fromDataStream(source); tableEnvironment.sqlQuery("select * from "+empTable).execute().print();
insertInto:Table通过写入TableSink输出。TableSink是一个通用接口,包括:
- 用于支持多种文件格式(如CSV、Apache Parquest、Apache Avro)
- 存储系统(如JDBC、Apache Hbase、Apache Cassandra、Es)
- 消息队列系统(如Apache kafka、Rabbit MQ)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.createTemporaryView("t_emp_d",source); Table tableSource = tblEnv.fromDataStream(source, $("empno"), $("ename"), $("job")); String sql= "create table t_emp_r(" + "empno Integer," + "ename String," + "job String) " + "with ( " + "'connector'='print')"; tblEnv.executeSql(sql); tableSource.insertInto("t_emp_r").execute(); //t_emp_r 不能当做表进行查询 只能当做sink端 // tblEnv.executeSql("select * from t_emp_r").print(); environment.execute();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String sqlSource="create table kafka_source( " + "deptno int," + "dname String," + "loc String)" + "with (" + "'connector'='kafka'," + "'topic'='flink_kafka_source'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='flink-zwf'," + "'scan.startup.mode'='earliest-offset'," + "'format'='csv')"; tblEnv.executeSql(sqlSource); String sqlSink="create table kafka_sink( " + "deptno int," + "dname String," + "loc String)" + "with (" + "'connector'='kafka'," + "'topic'='flink_kafka_sink'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='flink-zwf'," + "'scan.startup.mode'='earliest-offset'," + "'format'='json')"; tblEnv.executeSql(sqlSink); //从一张表查询数据插入到另外一张表中 tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").execute();
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").printExplain();
将一个Table对象转换成DataStream,直接调用表环境中国的ToDataStream();
tableEnv.toDataStream(table).print();
对于有更新操作的表,我们不要视图直接把它转换成DataStream打印,而是记录一下它的更新日志(change log)。
对于表的更新操作的表,就变成了一条更新日志的流,可转换成流打印输出。
规则:Insert插入操作编码是add消息。Delete删除操作编码为retract消息 update更新操作则为编码更改行的retract消息和更新后行的add消息。
tableEnv.toChangelogStream(table).print();
Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>1.15.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency>
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String jdbcSQL= "create table jdbc_scott_emp(" + "empno int," + "ename string," + "job string," + "mgr int," + "hiredate date," + "sal double," + "comm double," + "deptno int)" + "with (" + "'connector'='jdbc'," + "'url'='jdbc:mysql://master:3306/scott?serverTimeZone=Asia/Shanghai'," + "'table-name'='emp'," + "'driver'='com.mysql.cj.jdbc.Driver'," + "'username'='root'," + "'password'='Root@123456.')"; tblEnv.executeSql(jdbcSQL); tblEnv.sqlQuery("select * from jdbc_scott_emp").execute().print();
-- 从另一张表 "T" 将数据写入到 JDBC 表中 INSERT INTO MyUserTable SELECT id, name, age, status FROM T; -- JDBC 表在时态表关联中作为维表 SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = MyUserTable.id;
用于生成模拟数据,DataGen 连接器允许按数据生成规则进行读取。
不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。
连接器参数
//按照一定规则随机生成数据 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String SqlStr="CREATE TABLE datagen (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + " 'fields.f_random_str.length'='10'\n" + ")"; tblEnv.executeSql(SqlStr); tblEnv.sqlQuery("select * from datagen").execute().print();
由于flink是流式计算,会出现相同的key值数据写入,在写入kafka中,同一个key生成的value值会不断被更新(update
-u u+标记
),如果没有重复的key则被插入(insert+i标记
),如果value为空值就会被标记删除(delete+d标记
)。作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
//使用datagen模拟数据 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String dataGen= "create table t_dataGen(" + "deptno int," + "salnum int," + "ts AS localtimestamp," + "WATERMARK FOR ts AS ts" + ") with ( " + "'connector'='datagen'," + "'rows-per-second'='2'," + "'fields.deptno.min'='88'," + "'fields.deptno.max'='99'," + "'fields.salnum.min'='10'," + "'fields.salnum.max'='20')"; tblEnv.executeSql(dataGen); // tblEnv.sqlQuery("select deptno,sum(salnum) as salnum from t_dataGen group by deptno").execute().print(); //kafka sink端 String kafkaSink="create table upsert_kafka_num(" + "deptno int," + "salnum int," + "primary key(deptno) not enforced)" + "with(" + "'connector'='upsert-kafka'," + "'topic'='upsert_kafka'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'key.format'='csv'," + "'value.format'='json')"; tblEnv.executeSql(kafkaSink); //插入数据 tblEnv.executeSql("insert into upsert_kafka_num select deptno,sum(salnum) as salnum from t_dataGen group by deptno");
文件系统分为:本地文件系统、外部文件系统。
本地文件系统:ink 原生支持本地机器上的文件系统,包括任何挂载到本地文件系统的 NFS 或 SAN 驱动器,默认即可使用,无需额外配置。本地文件可通过 file:// URI Scheme 引用。
外部文件系统:常见的有HDFS、clickhouse、HBase,上述文件系统可以并且需要作为插件使用。
使用外部文件系统时,在启动 Flink 之前需将对应的 JAR 文件从
opt
目录复制到 Flink 发行版plugin
目录下的某一文件夹中。
public static void main(String[] args) { //设置环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String sqlDemo="create table t_dept_d(" + "deptno int," + "dname string," + "loc string)" + "with(" + "'connector'='filesystem'," + "'path'='data/dept.txt'," + "'format'='csv'" + ")"; tblEnv.executeSql(sqlDemo); tblEnv.sqlQuery("select * from t_dept_d").execute().print(); }
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.4</version> </dependency> <!--加载一些其他配置文件 比如core-site.xml dfs-core.xml yarn-site.xml等配置文件进resource目录-->
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String hdfsSql="create table dfs_dept(" + "deptno int," + "dname string," + "loc string)" + "with (" + "'connector'='filesystem'," + "'path'='hdfs://hdfs-zwf/dept.txt'," + "'format'='csv')"; tblEnv.executeSql(hdfsSql); tblEnv.sqlQuery("select * from dfs_dept").execute().print(); }
物理字段:源自于外部存储系统本身schema中的字段
- kafka消息的key、value中的字段
- mysql表中的字段
- hive表中的字段
- parquet文件中的字段
表达式字段:在物理字段上施加一个sql表达式,并将表达式结果定义为一个字段.
// 第一种sqlAPI StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); String sqlStr="create table upsert_info(" + "deptno int," + "salnum2 as salnum*100,"+ //计算列 "salnum int)" + "with (" + "'connector'='kafka'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='zwf'," + "'topic'='upsert_kafka'," + "'scan.startup.mode'='earliest-offset'," + "'format'='json')"; tblEnv.executeSql(sqlStr); tblEnv.sqlQuery("select * from upsert_info").execute().print(); //第二种方式 TableAPI tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("deptno", DataTypes.INT()) .column("salnum",DataTypes.INT()) .columnByExpression("salpluns","salnum*100") .build()).option("connector","kafka") .option("topic","upsert_kafka") .option("scan.startup.mode","earliest-offset") .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092") .format("json").build()); tblEnv.sqlQuery("select * from kafka_dept").execute().print();
元数据字段:来源于connector从外部存储系统中获取到外部系统元信息。
kafka消息,通常意义上的数据内容是在record的key和value中,但是kafka还会携带所属partition、offset、timestamp等元信息。而flink的连接器可以获取并暴露这些元信息,允许用户将信息定义成flinksql表中的字段。
//第一种sqlAPi String sqlStr="create table upsert_info(" + "deptno int," + "salnum2 as salnum*100," + //计算列 "event_time timestamp_ltz(3) metadata from 'timestamp',"+ //metadata列 "salnum int)" + "with (" + "'connector'='kafka'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='zwf'," + "'topic'='upsert_kafka'," + "'scan.startup.mode'='earliest-offset'," + "'format'='json')"; tblEnv.executeSql(sqlStr); tblEnv.sqlQuery("select * from upsert_info").execute().print(); //第二种方式 TableAPI tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("deptno", DataTypes.INT()) .column("salnum",DataTypes.INT()) //metadata column .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true) .columnByMetadata("k_offset",DataTypes.INT(),"offset",true) .build()).option("connector","kafka") .option("topic","upsert_kafka") .option("scan.startup.mode","earliest-offset") .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092") .format("json").build()); tblEnv.sqlQuery("select * from kafka_dept").execute().print();
单字段主键约束语法:
// SQL API id INT PRIMARY KEY NOT ENFORCED, name STRING // Table Api tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("deptno", DataTypes.INT()) //设置主键字段 primary key .primaryKey("deptno") .column("salnum",DataTypes.INT()) //metadata column .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true) .columnByMetadata("k_offset",DataTypes.INT(),"offset",true) .build()).option("connector","kafka")
多字段主键约束语法:
-- SQL API id, name, PRIMARY KEY(id,name) NOT ENFORCED //Table API tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("deptno", DataTypes.INT()) //设置主键字段 primary key .primaryKey("deptno","event_time") .column("salnum",DataTypes.INT()) //metadata column .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true) .columnByMetadata("k_offset",DataTypes.INT(),"offset",true) .build()).option("connector","kafka") //第一种sqlAPi String sqlStr="create table upsert_info(" + "deptno int," +//计算列 "event_time timestamp_ltz(3) metadata from 'timestamp',"+ //metadata列 "dname string," + "loc string," + "primary key(deptno,loc) not enforced)" + "with (" + "'connector'='upsert-kafka'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='zwf'," + "'topic'='flink_kafka_source'," + "'key.format'='csv'," + "'value.format'='json')"; tblEnv.executeSql(sqlStr); tblEnv.sqlQuery("select * from upsert_info").execute().print();
注意的是:kafka连接器模式下不能设置主键,但是upsert-kafka连接器模式必须设置主键!主键字段不能有空值
在upsert-kafka模式下,key和value值不能为空,否则在csv模式中会解析失败!
connector 连接器:对接外部存储时, 根据外部存储中的数据格式不同, 需要用到不同的 format 组件;
format 组件:作用就是告诉连接器, 如何解析外部存储中的数据及映射到表 schema;
使用基本步骤:
- 导入format组件的jar依赖
- 指导format组件名称
- 设置format组件所需的参数
<!--json格式依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!--csv格式依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency>
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' ) CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' )
动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query) 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更 新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定 义物化视图的查询。
需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
与spark、hive组件中的表最大不同之处在于flink SQL中的表是动态表。flink核心就是对有界或者*的数据流处理,是流式持续处理的过程。
在动态表上计算一个连续查询,生成一个新的动态表。与批处理查询不同,连续查询从不终止,根据其输入表上的更新其结果表。在任何时候,连续查询的结果在语义上与批处理模式在输入表快照上执行相同查询的结果相同。
创建表的DDL,增加一个字段,通过watermark语句来定义事件时间属性。
WATERMARK 语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件 时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。
//水位线 设置延迟时间5s String eventTime="create table proc_dept_tbl(" + "deptno int," + "dname string," + "loc string," + "ts timestamp_ltz(3) metadata from 'timestamp'," + "watermark for ts as ts-interval '5' second" + // pt是事件处理 ")with( " + "'connector'='kafka'," + "'topic'='flink_kafka_sink'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='zwf'," + "'scan.startup.mode'='earliest-offset'," + "'format'='json')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(eventTime); tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print(); //Table API tblEnv.createTable("t_water_mark", TableDescriptor.forConnector("kafka") .option("topic","flink_kafka_sink") .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092") .option("properties.group.id","zwf") .option("scan.startup.mode","earliest-offset") .format("json") .schema(Schema.newBuilder() .column("deptno",DataTypes.INT()) .column("dname",DataTypes.STRING()) .column("loc",DataTypes.STRING()) .columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true) .watermark("ts","ts-interval '5' second").build()).build()); tblEnv.sqlQuery("select deptno,dname,ts from t_water_mark").execute().print();
定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间
在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ
//Flink SQL 水位线 处理时间 String procTime="create table proc_dept_tbl(" + "deptno int," + "dname string," + "loc string," + "pt as proctime()" + // pt是事件处理 ")with( " + "'connector'='kafka'," + "'topic'='flink_kafka_sink'," + "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," + "'properties.group.id'='zwf'," + "'scan.startup.mode'='earliest-offset'," + "'format'='json')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(procTime); tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print(); //使用TableApi执行 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class)); Table table = tblEnv.fromDataStream(source, Schema.newBuilder() .column("empno",DataTypes.INT()) .column("ename", DataTypes.STRING()) .column("job",DataTypes.STRING()) .column("mgr",DataTypes.INT()) .column("hiredate",DataTypes.BIGINT()) .column("sal",DataTypes.DOUBLE()) .column("comm",DataTypes.DOUBLE()) .column("deptno",DataTypes.INT()) .columnByExpression("ts","proctime()") .build()); tblEnv.sqlQuery("select empno,ename,ts from"+table.toString()).execute().print();
处理时间属性同样可以在将DataStream转换为表的时候来定义。我们调用fromDataStream()方法 创建表时,可以用.proctime()后缀来指定处理时间属性字段。
由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已 有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。
//快速入门 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // environment.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); //读取文本文件数据转为Table 对象 DataStream<Emp> source = environment.readTextFile("data/emp.txt") .map(lines ->JSONObject.parseObject(lines, Emp.class)); //把JAVA对象转为table对象 //注意Emp对象中hiredate时间戳是Long类型 // {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30} Table table = tableEnv.fromDataStream(source,$("empno"),$("ename"),$("ts").proctime()); table.select($("*")).execute().print();
窗口起始点:窗口开始起始时间
窗口结束点:窗口结束时间
窗口时间:窗口结束时间-1
滚动窗口在DataStream API中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。
Tumble(table data,timecol,size[,offset])函数三个必需参数:
data:表参数,此表需要包含一个时间属性列。
timecol:一个描述符,指示数据的哪个时间属性列应该映射到滚动的窗口。
size:指定滚动窗口的大小。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); //执行SQL 随机生成gid和sales gid随机值取10到20 sales随机值取1到9 //ts 使用本地时间 水位线是本地时间延迟5s tblEnv.executeSql("CREATE TABLE t_goods (\n" + " gid INT,\n" + " sales INT,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.gid.min'='10',\n" + " 'fields.gid.max'='20',\n" + " 'fields.sales.min'='1',\n" + " 'fields.sales.max'='9'\n" + ")"); // tblEnv.sqlQuery("select * from t_goods").execute().print(); //使用滚动窗口 5s滚动计算一次 // String tumbleWin="select * from table(tumble(table t_goods,descriptor(ts),interval '5' second))"; // tblEnv.sqlQuery(tumbleWin).execute().print(); //每个时间窗口中每个guid中总销售额信息 tblEnv.sqlQuery( "select window_start,window_end,gid,sum(sales) as sum_sales " + "from table(tumble(table t_goods,descriptor(ts),interval '5' second))" + "group by window_start,window_end,gid" ).execute().print();
Hopping windows也称为"sliding windows"
HOP函数分配的窗口覆盖大小间隔内的行,并根据时间属性性列移动每个窗口
HOP函数有三个必需的参数:HOP(Table data,slide,size[,offset])
- data:表格值,带有时间戳字段的表格。
- slide:指定顺序hopping窗口开始之间的持续时间。
- size:指定hopping窗口宽度的持续时间,size必须是slide的整数倍。
//滑动窗口表值函数 窗口表值函数 //随机生成gid大小是10到20 sales大小是1到10 String datagen="create table t_datagen(" + "gid int," + "sales int," + "ts as localtimestamp," + "watermark for ts as ts-interval '5' second" + ") with (" + "'connector'='datagen'," + "'rows-per-second'='10'," + "'fields.gid.min'='10'," + "'fields.gid.max'='20'," + "'fields.sales.min'='1'," + "'fields.sales.max'='10')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(datagen); // tblEnv.sqlQuery("select * from t_datagen").execute().print(); //窗口大小是15s 滑动3s tblEnv.sqlQuery("select gid,sum(sales),window_start,window_end from table(hop(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
CUMULATE函数将元素分配给覆盖在初始步长间隔内的行,并将每一步扩展为多一个步长(保持 window start固定),直到最大窗口大小。
可以把cumulative函数看作应用TUMBLE窗口,首先使用最大窗口大小,然后将每个滚动窗口分 割成几个具有相同窗口开始和窗口结束步长差异的窗口。
因此,累积窗口确实是重叠的,而且没有固定的大小。
cumulate函数有三个必须的参数:
cumulate(table data,descriptor(timecol),step,size)——必须参数有以下:
- data:表格参数,表格必须包含一个时间属性列
- timecol:时间属性字段,也就是使用那个时间。
- step:连续累积窗口结束之间增加的窗口大小的持续时间。
- size:累积窗口的最大宽度的持续时间。大小必须是步长的整数倍。
//累加窗口大小时间 //滑动窗口表值函数 窗口表值函数 //随机生成gid大小是10到20 sales大小是1到10 String datagen="create table t_datagen(" + "gid int," + "sales int," + "ts as localtimestamp," + "watermark for ts as ts-interval '5' second" + ") with (" + "'connector'='datagen'," + "'rows-per-second'='10'," + "'fields.gid.min'='10'," + "'fields.gid.max'='20'," + "'fields.sales.min'='1'," + "'fields.sales.max'='10')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(datagen); //每3s计算一次 并进行累加 比如19:45-19:48:10 19:48-19:51:20=>19:45-19:51:30 tblEnv.sqlQuery("select window_start,window_end,gid,sum(sales) as sales_sum from table(cumulate(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
group+distinct:表示分组+去重,在用于uv统计时就需要!
//用于网站统计 uv 用户访问数 pv 页面访问数 String websiteSQL="create table wbSiteNum(" + "gid int," + "url string," + "ts as localtimestamp," + "watermark for ts as ts-interval '5' second" + ")with(" + "'connector'='datagen'," + "'fields.gid.min'='1000'," + "'fields.gid.max'='2000'," + "'fields.url.length'='10')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(websiteSQL); // tblEnv.sqlQuery("select * from wbSiteNum").execute().print(); tblEnv.sqlQuery( "select count(distinct gid) as uv,count(url) as pv\n" + "from wbSiteNum" ).execute().print();
在SQL中一般所说的聚合,通过一些内置的函数来实现,比如SUM、MAX、MIN、AVG、以及count。
它得特点是对多条输入数据进行计算,得到一个唯一的值,属于多对一的转换。比如我们可以通过下面的代码计算输入数据的个数。更多时候,我们通过group by子句指定分组的键,从而对数据按照某个字段做一个分组统计。
//分组求和 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" + "FROM (VALUES\n" + " ('省1','市1','县1',100),\n" + " ('省1','市2','县2',101),\n" + " ('省1','市2','县1',102),\n" + " ('省2','市1','县4',103),\n" + " ('省2','市2','县1',104),\n" + " ('省2','市2','县1',105),\n" + " ('省3','市1','县1',106),\n" + " ('省3','市2','县1',107),\n" + " ('省3','市2','县2',108),\n" + " ('省4','市1','县1',109),\n" + " ('省4','市2','县1',110))\n" + "AS t_person_num(pid, cid, xid,num)\n" + "GROUP BY pid;").execute().print();
维度的上卷,字段维度从细粒度上转变粗粒度!
//分组求和 rollup(pid,cid,xid) 维度从粗粒度到细粒度 pid->cid->xid StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" + "FROM (VALUES\n" + " ('省1','市1','县1',100),\n" + " ('省1','市2','县2',101),\n" + " ('省1','市2','县1',102),\n" + " ('省2','市1','县4',103),\n" + " ('省2','市2','县1',104),\n" + " ('省2','市2','县1',105),\n" + " ('省3','市1','县1',106),\n" + " ('省3','市2','县1',107),\n" + " ('省3','市2','县2',108),\n" + " ('省4','市1','县1',109),\n" + " ('省4','市2','县1',110))\n" + "AS t_person_num(pid, cid, xid,num)\n" + "GROUP BY rollup(pid,cid,xid)").execute().print();
所有维度分组显示,也就是正方体原则!比如(col1,col2,col3)2^3个维度表示。
tableEnvironment.sqlQuery("SELECT pid, cid, xid, sum(num) AS total\n" + "FROM (VALUES\n" + " ('省1','市1','县1',100),\n" + " ('省1','市2','县2',101),\n" + " ('省1','市2','县1',102),\n" + " ('省2','市1','县4',103),\n" + " ('省2','市2','县1',104),\n" + " ('省2','市2','县1',105),\n" + " ('省3','市1','县1',106),\n" + " ('省3','市2','县1',107),\n" + " ('省3','市2','县2',108),\n" + " ('省4','市1','县1',109),\n" + " ('省4','市2','县1',110))\n" + "AS t_person_num(pid, cid, xid, num)\n" + "GROUP BY CUBE(pid, cid, xid)").execute().print();
自定义维度分组,以下案例
(pid, cid, xid),(pid, cid),(pid), ()
自定义四个维度分组。
//自定义维度分组 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" + "FROM (VALUES\n" + " ('省1','市1','县1',100),\n" + " ('省1','市2','县2',101),\n" + " ('省1','市2','县1',102),\n" + " ('省2','市1','县4',103),\n" + " ('省2','市2','县1',104),\n" + " ('省2','市2','县1',105),\n" + " ('省3','市1','县1',106),\n" + " ('省3','市2','县1',107),\n" + " ('省3','市2','县2',108),\n" + " ('省4','市1','县1',109),\n" + " ('省4','市2','县1',110))\n" + "AS t_person_num(pid, cid, xid,num)\n" + "GROUP BY GROUPING SETS ((pid, cid, xid),(pid, cid),(pid), ())").execute().print();
比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它 之前 10 个数的平均值。 就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。
分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;
而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一 个“多对多”的关系.
SELECT <聚合函数> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <时间属性字段> <开窗范围>) , ... FROM ...
1、partition by(可选)
用来指定分区的键,类似于group by的分组,这部分是可选的。
2、order by (必选)
OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以 基于时间也可以基于数量 。
在 Flink 的流处理中,目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面 的字段必须是定义好的时间属性
开窗范围:
1、对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。
2、这个范围是由between<下界>and<上界>来定义,也就是"从下界到上界"的范围。
3、目前上界只能是current row,也就是定义一个”从之前某一行到当前行“的范围。
4、开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:
范围间隔(range intervals 以时间划分范围)
range
为前缀,就是基于order by指定时间字段去选择一个范围,一般就是当前行时间戳之前的一段时间。案例
//执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_goods (\n" + " gid STRING,\n" + " type INT,\n" + " price INT,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.gid.length'='10',\n" + " 'fields.type.min'='1',\n" + " 'fields.type.max'='5',\n" + " 'fields.price.min'='1',\n" + " 'fields.price.max'='9'\n" + ")"); //截止当前 前10s的每个类型的平均价格 tableEnvironment.sqlQuery( "select tg.*,avg(price) over(partition by type order by ts range between interval '10' second preceding and current row) as price_avg\n" + "from t_goods tg" ).execute().print(); //截止当前 前10行的每个类型商品的平均价格 tableEnvironment.sqlQuery( "select tg.*,avg(price) over(partition by type order by ts rows between 10 preceding and current row) as price_avg\n" + "from t_goods tg" ).execute().print();
在 Flink SQL 中,是通过 OVER 聚合和一个条件筛选来实现TopN的。
利用row_number()函数为每一行数据聚合得到一个排序之后的行号,行号为row_num,并在外层的查询中以row_num<=N作为条件进行筛选,就可以得到根据排序字段统计的topN结果了。
FlinkSQL专门用over聚合做了优化实现,只有在topN的应用场景中,over窗口oder by后才可以指定其他排序字段,要实现top N要严格按照上面格式定义,否则FlinkSQL优化器将无法正常解析。而且目前TableApi不支持row_number()函数,只有SQL API实现TopN方式。
SELECT ... FROM ( SELECT ..., ROW_NUMBER() OVER ( [PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...] ) AS row_num FROM ...) WHERE row_num <= N [AND <其它条件>]
//窗口排序 String dataGenDemo="create table t_datagen(" + "gid string," + "price int," + "type int," + "ts as localtimestamp," + "watermark for ts as ts-interval '10' second" + ")with(" + "'connector'='datagen'," + "'fields.gid.length'='10'," + "'rows-per-second'='10'," + "'fields.price.min'='100'," + "'fields.price.max'='999'," + "'fields.type.min'='1'," + "'fields.type.max'='1')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(dataGenDemo); // tblEnv.sqlQuery("select * from t_datagen").execute().print(); String topNStr="select * from\n" + "(select d.*,row_number() over(partition by type order by price desc) as row_num from\n"+ "t_datagen d) where row_num<=3"; tblEnv.sqlQuery(topNStr).execute().print(); ========================================================================================= //滚动窗口每5s滚动一次 每种类型排名前3的商品信息 String topNWin=" select *\n" + " from(\n" + " select *,row_number() over(partition by type order by price desc) as row_num\n" + " from table(tumble(table t_datagen,descriptor(ts),interval '5' second))\n" + " ) where row_num<=3"; tblEnv.sqlQuery(topNWin).execute().print(); =========================================================================================
//查询10秒内 每个窗口销售总额最高的前三名的种类 String topNWinSql="select * " + " from(select type,t_price,window_start,window_end,row_number() over(partition by window_start,window_end order by t_price desc) as row_num\n" + " from (\n" + " select type,window_start,window_end,sum(price) as t_price\n" + " from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" + " group by type,window_start,window_end\n" + " ))where row_num<=3"; tblEnv.sqlQuery(topNWinSql).execute().print(); //查询10秒内 每个种类中销售总额最高的前三名的商品 String topNWinSql="select * " + " from (select gid,type,window_start,window_end,row_number() over(partition by window_start,window_end,type,gid order by price desc) as row_num\n" + " from (\n" + " select *\n" + " from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" + " ) )" + "where row_num<=3"; tblEnv.sqlQuery(topNWinSql).execute().print();
与标准SQL一致,Flink SQL的常规联结分为内联结(inner join)和外联结(outer join),区别在于结果中是否包含不符合联结条件的行。目前仅支持等值条件作为联结条件,也就是关键字ON后面必须是判断两表中字段相等的逻辑表达式。
//生成两股数据流 String dataStr="create table dataGen_demo(" + "gid string," + "type int," + "price int," + "ts1 as localtimestamp," + "watermark for ts1 as ts1-interval '5' second" + ") with (" + "'connector'='datagen'," + "'rows-per-second'='1'," + "'fields.gid.length'='10'," + "'fields.type.min'='1'," + "'fields.type.max'='30'," + "'fields.price.min'='100'," + "'fields.price.max'='999')"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); tblEnv.executeSql(dataStr); // tblEnv.sqlQuery("select * from dataGen_demo").execute().print(); String dataStr1="create table dataGen_demo1(" + "type int," + "tname string," + "price int," + "ts2 as localtimestamp," + "watermark for ts2 as ts2-interval '5' second" + ") with (" + "'connector'='datagen'," + "'rows-per-second'='1'," + "'fields.tname.length'='10'," + "'fields.type.kind'='sequence'," + "'fields.type.start'='1'," + "'fields.type.end'='50'," + "'fields.price.min'='300'," + "'fields.price.max'='400')"; tblEnv.executeSql(dataStr1); tblEnv.sqlQuery("select * from dataGen_demo inner join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
left join: 左外连接 ,左表数据全部显示,在内存等待数据匹配,匹配后删除原来未匹配的数据重新显示。
right join: 右外连接,右表数据全部显示,在内存等待数据匹配,匹配后删除原来未匹配的数据重新显示。
full join:不管数据是否匹配,左右表的数据全部显示,不管哪个表在内存中匹配到数据都先删除未匹配的数据,重新显示已经匹配的数据。
tblEnv.sqlQuery("select * from dataGen_demo left join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print(); tblEnv.sqlQuery("select * from dataGen_demo full join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
两条流的join对应着SQL中两个表的join,是流处理中特有的联结方式。
目前 Flink SQL 还不支持窗口联结,而间隔联结则已经实现,这里除了符合约束条件的两条中数据的笛卡尔积,多了一个时间间隔的限制。
具体语法:间隔联结不需要用join关键字,直接在from后将联结两表列出来的就可以,用逗号分割。联结条件用where子句来定义,用一个等值表达式描述。交叉联结之后用where进行条件筛选,效果跟内联结inner join... on ... 非常类似,我们可以在where子句中,联结条件后用and追加一个时间间隔的限制条件。
String dataStr1="create table dataGen_demo1(" + "type int," + "tname string," + "price int," + "ts2 as localtimestamp," + "watermark for ts2 as ts2-interval '5' second" + ") with (" + "'connector'='datagen'," + "'rows-per-second'='1'," + "'fields.tname.length'='10'," + "'fields.type.kind'='sequence'," + "'fields.type.start'='1'," + "'fields.type.end'='50'," + "'fields.price.min'='300'," + "'fields.price.max'='400')"; tblEnv.executeSql(dataStr1); tblEnv.sqlQuery("select * from dataGen_demo d,dataGen_demo1 g where d.type=g.type and d.ts1 between g.ts2-interval '5' second and g.ts2+interval '5' second").execute().print();
Flink提供了SQL Client,有了它我们可以向hive的beeline一样直接在控制台编写SQL并提交作业。
Flink SQL client支持运行在standalone集群和yarn集群上。提交任务的命令有所不同。
##启动集群、前提已经配置好flink环境变量 start-cluster.sh ##启动客户端 sql-client.sh embedded
前提要开启hadoop-yarn大数据架构。
flink每次启动yarn-session,都会创建一个/temp/.yarn-properties-root文件,记录了最近一次提交的yarn session对应的Application ID。注意:启动Yarn Session和SQL client必须使用相同的用户。
##启动YarnSession模式 前提已经配置好flink环境变量 yarn-session.sh -n 3 -jm 1024 -tm 1024 ##启动客户端 必须与上面命令在同一个服务器节点上 sql-client.sh embedded -s yarn-session ## 客户端控制台测试 select 'hello word'; #测试连接是否成功 SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; #测试数据 # client界面执行下面命令 # 在专门的界面展示,使用分页table格式。可按照界面下方说明,使用快捷键前后翻页和退出到SQL命令行 SET sql-client.execution.result-mode = table; # changelog格式展示,可展示数据增(I)删(D)改(U) SET sql-client.execution.result-mode = changelog; # 接近传统数据库的展示方式,不使用专门界面 SET sql-client.execution.result-mode = tableau;
如果运行sql client时,需要使用第三方依赖包时,就需要将项目中用到的依赖放入flink安装位置的lib目录下。
例如:flink-connector-kafka_2.11-1.13.2.jar: 读写Kafka支持。
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/tableapi/
Table API 是批处理和流处理的统一的关系型 API。Table API 的查询不需要修改代码就可以采用批 输入或流输入来运行。Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。 Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样 使用字符串类型的值来指定查询。
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/overview/
Flink 所支持的 SQL 语言,包括数据定义语言(Data Definition Language,DDL)、数据操纵语(Data Manipulation Language,DML)以及查询语言。Flink 对 SQL 的支持基于实现了 SQL 标准的 Apache Calcite。
SQL中,我们可以把一些数据的转换操作包装起来,嵌入到SQL查询中统一调用,这是函数。
Flink的Table API和SQL同样提供了函数的功能。两者在调用时略有不同:
- Table API中的函数是通过数据对象的方法调用来实现的
- SQL则是直接引用函数名称,传入数据作为参数。
- Table API是内嵌在java语言中,很多方法需要在类中额外添加,目前支持的函数比较少。
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/functions/overview/
Flink 中的函数有两个划分标准:
- 一个划分标准是:系统函数和catalog函数。
- 一个划分标准是临时函数和持久函数
- 因此提供了4种函数:临时性系统函数、系统函数、临时性catalog函数、catalog函数
flink中可以通过精确、模糊两种引用方式引用函数:精确函数允许用户跨catalog、数据库,也就是指定catalog和database函数;模糊函数不用指定catalog和database使用默认catalog和database。
系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的 功能模块。可以通过固定的函数名直接调用,实现想要的转换操作。又分为两大类:标量函数和聚合函数。
函数分类:标量函数、聚合函数、时间间隔单位和时间点标识符、列函数
Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。
当前UDF主要有以下几类:
- 标量函数:将输入的标量值转换成一个新的标量值
- 表函数:将标量值转换成一个或多个新的行数据,也就是扩展成一个表。
- 聚合函数:将多行数据里的标量值转换成一个新的标量值。
- 表聚合函数:将多行数据里的标量值转换成一个或多个新的 行数据。
自定义方式:需要自定义一个类来继承抽象类 ScalarFunction,并实现叫作 eval() 的求值方法。
标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是 eval。
求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型,写完后将类注册到表环境就可以直接在SQL中调用了。
import org.apache.flink.table.functions.ScalarFunction; /** * @author MrZeng * @version 1.0 * @date 2024-01-13 21:34 */ //自定义标量函数 public class ScalarUDFDemo extends ScalarFunction { // 接受任意类型输入,返回 INT 型输出 必须使用公共权限的eval方法 public String eval(String input) { //字符串连接字符串长度 return input.concat(String.valueOf(input.length())); } } //创建模拟数据 //执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_datagen (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + " 'fields.f_random_str.length'='10'\n" + ")"); // tableEnvironment.sqlQuery("select * from t_datagen").execute().print(); //使用Table API 直接内嵌函数执行 第一种方式 // tableEnvironment.from("t_datagen").select(call(ScalarUDFDemo.class, $("f_random_str"))).execute().print(); //第二种方式 tableEnvironment.createTemporarySystemFunction("sfsl",ScalarUDFDemo.class); tableEnvironment.sqlQuery("select sfsl(f_random_str) from t_datagen").execute().print();
自定义方式:
要实现自定义的表函数,需要自定义类来继承抽象类 TableFunction,内部必须要实现的也 是一个名为 eval 的求值方法。
与标量函数不同的是,TableFunction 类本身是有一个泛型参数T 的,这就是表函数返回数据的类型。
而eval()方法没有返回类型,内部也没有 return语句,是通过调用 collect()方法来发送想要 输出的行数据的。
1,寻梦环游记,喜剧:8_动画:7_冒险:3_音乐:9_家庭:6 2,至爱梵高,剧情:8_传记:7_动画:3 3,小丑回魂,剧情:6_儿童:7_恐怖:9
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; /** * @author MrZeng * @version 1.0 * @date 2024-01-13 21:53 */ /** * Row<type STRING,score INT> 输出字段名type、score 数据类型分别是STRING、INT */ @FunctionHint(output = @DataTypeHint("Row<type STRING,score INT>")) public class UDFTableFunction extends TableFunction<Row> { //输入数据类型是字符串 /** * 喜剧:8_动画:7_冒险:3_音乐:9_家庭:6 * @param line */ public void eval(String line){ String[] split = line.split("_"); for (String s : split) { String[] v = s.split(":"); collect(Row.of(v[0],Integer.parseInt(v[1]))); } } } ========================================================================================= StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment); //使用FileSystem读取文件 String fs="create table t_movie(" + "id int," + "name string," + "types string" + ") with (" + "'connector'='filesystem'," + "'path'='data/movie.txt'," + "'format'='csv')"; //sql读取数据 tblEnv.executeSql(fs); // tblEnv.sqlQuery("select * from t_movie").execute().print(); //Table API tblEnv .from("t_movie") .joinLateral(call(UDFTableFunction.class, $("types")).as("type", "score")) .select($("id"),$("name"),$("type"),$("score")) .execute() .print(); //SQL API tblEnv.createTemporarySystemFunction("tbl_f",UDFTableFunction.class); tblEnv.sqlQuery("select id,name,type,score from t_movie ,lateral table(tbl_f(types))").execute().print();
自定义方式:
- 自定义聚合函数需要继承抽象类 AggregateFunction。
- AggregateFunction 有两个泛型参数,T 表示聚合输出的结果类型,ACC 则表示聚 合的中间状态类型。
- 每个 AggregateFunction 都 必须 实现以下几个方法:
- createAccumulator():这是创建累加器的方法。没有输入参数,返回类型为累加器类型 ACC
- accumulate(): 这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定 的,就是当前的累加器,类型为 ACC,表示当前聚合的中间状态;
- getValue():这是得到最终返回结果的方法。输入参数是 ACC 类型的累加器,输出类型为 T。 在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。
package com.zwf.udf; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.AggregateFunction; /** * @author MrZeng * @version 1.0 * @date 2024-01-13 22:31 */ /** * AggregateFunction<Double, Tuple2<Integer,Integer>> 输出类型是Double 中间状态类型是Tuple2<Integer,Integer> * 必须要实现getValue() createAccumulator() accumulate() 三个方法 */ public class UDFAggregationDemo extends AggregateFunction<Double, Tuple2<Integer,Integer>> { /** * 输出的函数逻辑代码 * @param integerIntegerTuple2 * @return */ @Override public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) { if (integerIntegerTuple2.f0==0){ return 0.0; } return integerIntegerTuple2.f0*1.0/integerIntegerTuple2.f1; } /** * * @return 初始化中间状态值 */ @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0,0); } //输入类型是两个int类型数据 /** * 如果不加 @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})注解 * 传入的字段数据类必须有not null的约束 * @param acc * @param weight * @param price */ @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")}) public void accumulate(Tuple2<Integer,Integer> acc ,Integer weight,Integer price){ acc.f0+=weight*price; acc.f1+=weight; } } ======================================================================================== package com.zwf.flinkSQL; import com.zwf.udf.UDFAggregationDemo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author MrZeng * @version 1.0 * @date 2024-01-13 22:43 */ public class UDFDemo3 { public static void main(String[] args) { //执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_order (\n" + " id INT,\n" + " type INT,\n" + " weight INT,\n" + " price INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.id.kind'='sequence',\n" + " 'fields.id.start'='1',\n" + " 'fields.id.end'='1000',\n" + " 'fields.type.min'='1',\n" + " 'fields.type.max'='3',\n" + " 'fields.weight.min'='10',\n" + " 'fields.weight.max'='20',\n" + " 'fields.price.min'='100',\n" + " 'fields.price.max'='200'\n" + ")"); tableEnvironment.createTemporarySystemFunction("aggre", UDFAggregationDemo.class); tableEnvironment.sqlQuery("select type,aggre(weight,price) from t_order group by type").execute().print(); } }
用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。
自定义方式:
- createAccumulator():创建累加器的方法,与 AggregateFunction 中用法相同
- accumulate():聚合计算的核心方法,与 AggregateFunction 中用法相同
- emitValue():所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着 AggregateFunction中的 getValue()方法;区别在于 emitValue 没 有输出类型,而输入参数有两个:第一个是 ACC类型的累加器 第二个则是用于输出数据的“收集器”out,它的类型为 Collect。
package com.zwf.udf; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.util.Collector; /** * @author MrZeng * @version 1.0 * @date 2024-01-13 22:56 */ /** * TableAggregateFunction<out,acc>: out 输出类型 acc中间值类型 */ public class TableAggregateUDF extends TableAggregateFunction<String, Tuple3<Integer,Integer,Boolean>> { /** * 初始化中间值 * @return */ @Override public Tuple3<Integer, Integer, Boolean> createAccumulator() { return Tuple3.of(0,0,false); } /** * * @param acc 中间值 * @param price 输入值 */ public void accumulate(Tuple3<Integer,Integer,Boolean> acc,Integer price){ if(price>acc.f0){ acc.f0=price; acc.f1=acc.f0; acc.f2=true; }else if (price>acc.f1){ acc.f1=price; acc.f2=true; }else { acc.f2=false; } } /** * * @param acc 中间值 * @param out 输出集合 */ public void emitValue(Tuple3<Integer, Integer, Boolean> acc, Collector<String> out){ if(acc.f2){ acc.f2=false; out.collect("First[" + acc.f0 + "]Second[" + acc.f1 + "]"); } } } ========================================================================================= package com.zwf.flinkSQL; import com.zwf.udf.TableAggregateUDF; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author MrZeng * @version 1.0 * @date 2024-01-13 23:06 */ public class UDFDemo4 { public static void main(String[] args) { //执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_order (\n" + " id INT,\n" + " type INT,\n" + " price INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.id.kind'='sequence',\n" + " 'fields.id.start'='1',\n" + " 'fields.id.end'='1000',\n" + " 'fields.type.min'='1',\n" + " 'fields.type.max'='3',\n" + " 'fields.price.min'='100',\n" + " 'fields.price.max'='200'\n" + ")"); //普通查询 // tableEnvironment.sqlQuery("select * from t_order").execute().print(); // 注册函数 tableEnvironment.createTemporarySystemFunction("tafop", TableAggregateUDF.class); tableEnvironment.sqlQuery("select type,tafop(price) from t_order group by type").execute().print(); } }
CDC,Change Data Capture变动数据获取的简称,使用CDC从数据库获取已提交的更改并将这些更改发送到下游,供下游使用。
在以前的数据同步中,如果想实时获取数据库的数据,一般采用架构就是采用第三方工具,比如canal、debezium等,实时采集数据库的变更日志,然后将数据发送到kafka消息队列,最后通过其他组件、比如flink、spark等消费kafka中的数据,计算之后发送到下游系统。
新架构下flink直接消费数据库的增量日志,替代了原来的数据采集层,然后直接对数据进行计算, 最后将计算结果发送到下游.
工作原理:启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该 锁将阻止其他数据库的写入。然后,它读取当前binlog位置以及数据库和表的schema之后, 将释放 全局读取锁。然后,它扫描数据库表并从先前记录的位置读取binlog。Flink将定期执 行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的 binlog位置恢复。因此,它保证了仅一次的语义。
优点:开箱即用,简单易上手 减少维护的组件,简化实时链路,减轻部署成本 减小端到端延迟
Flink SQL 内部支持了完整的 changelog 机制,所以 Flink 对接 CDC 数据只需要把CDC 数据转换成 Flink 认识的数据,以便更好支持和集成 CDC。
重构后的 TableSource 输出的都是 RowData 数据结构,代表了一行的数据。在RowData 上面会 有一个元数据的信息,我们称为 RowKind.
RowKind 里面包括了插入、更新前、更新后、删除,这样和数据库里面的 binlog 概念十分类似。
通过 Debezium 采集的 JSON 格式,包含了旧数据和新数据行以及原数据信息,对接 Debezium JSON 的数据,其实就是将这种原始的 JSON 数据转换成 Flink 认识的 RowData。
官方文档:
https://github.com/ververica/flink-cdc-connectors
mysql数据库的数据新增或者修改,将实时获取到flink上进行计算处理并传输到下游!
目前支持的数据库有以下:
# 服务器ID server_id=12345 log_bin=/var/lib/mysql/mysql-bin expire_logs_days=7 # 必须为ROW binlog_format=ROW binlog_cache_size=16M max_binlog_size=100M max_binlog_cache_size=256M relay_log_recovery=1 # 必须为FULL,MySQL-5.7后才有该参数 binlog_row_image=FULL expire_logs_days=30 binlog_do_db=scott
DROP TABLE IF EXISTS `dept`; CREATE TABLE `dept` ( `deptno` int(11) NOT NULL, `dname` varchar(255) DEFAULT NULL, `loc` varchar(255) DEFAULT NULL, PRIMARY KEY (`deptno`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; --代码运行之后再开始插入数据 INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK'); INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS'); INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO'); INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');
<!-- Flink CDC 的依赖 --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <!--驱动包版本必须是8.0.27及其以上版本--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency>
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //创建表 tableEnvironment.executeSql("CREATE TABLE flink_cdc_dept (\n" + " deptno INT,\n" + " dname STRING,\n" + " loc STRING,\n" + " PRIMARY KEY(deptno) NOT ENFORCED\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '192.168.147.120',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = 'Root@123456.',\n" + " 'database-name' = 'scott',\n" + " 'table-name' = 'dept')"); //简单查询 tableEnvironment.sqlQuery("select * from flink_cdc_dept").execute().print();
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函 数和信息。
元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。 Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来 访问。
GenericInMemoryCatalog: 基于内存实现,所有元数据只在session声明周期可用。
JdbcCatalog:将flink通过jdbc协议连接到关系数据库。Postgres Catalog 和 MySQL Catalog 是目前 JDBC Catalog 仅有的两种实现。
HiveCatalog:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。
用户自定义Catalog:编写类实现对应的 CatalogFactory 接口来自定义开发Catalog。
将flink catalog中的元数据信息持久化存储到hive metastore对应的元数据库中,flink打通hive集成,如同使用spark SQL或者impala操作hive中的数据一样,直接使用flink直接读写hive中的表。
<!-- Flink On Hive--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>1.15.2</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> <exclusions> <exclusion> <groupId>org.apache.calcite.avatica</groupId> <artifactId>avatica</artifactId> </exclusion> <exclusion> <groupId>org.apache.calcite</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; String defaultDatabase = "mydatabase"; String hiveConfDir = "/opt/hive-conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); //获取hive中元数据注册flink中的catalog。 tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session //使用hive中的catalog tableEnv.useCatalog("myhive")
flink提供了两种优化器:
- RBO(基于规则的优化器)
- CBO(基于成本的优化器)
优化方案:
- 基于 Apache Calcite 的子查询解相关
- 投影下推(Projection Pushdown)
- 分区剪裁(Partition Prune)
- 谓词下推(Predicate Pushdown)
- 常量折叠(Constant Folding)
- 子计划消除重复数据以避免重复计算
- 特殊子查询重写:使用left semi-joins left anti-join
- 可选 join 重新排序: 通过 table.optimizer.join-reorder-enabled 启用
优化器不仅基于计划,而且还基于可从数据源获得的丰富统计信息以及每个算子(例如 io,cpu, 网络和内存)的细粒度成本来做出明智的决策。
常量折叠:对sql中的常量的加减乘除等操作进行预计算,避免执行过程频繁对常量重复执行加减 乘除计算: 折叠前:1+2+t1.value;折叠后:3+t1.value.
在from数据源中过滤出重要数据,降低了数据的扫描范围,提升了数据库查询的效率!
投影下推:可以用来避免加载不需要的字段,只需要查询出需要查询的数据库字段。由于SQL中没用到,加载多余字段就是浪费,所以将project操作下推执行,就不需要加载无 用字段。而且此时假如是列存储,只需要加载指定的列,优化更大。
两表进行join时,先把大表中的重要数据过滤出来变成小表,然后通过sortmergejoin, hashjoin, boradcasthashjoin,把表中数据过滤后再进行join,减少笛卡尔积值。
MiniBatch 聚合:MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞 吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和 延迟之间的权衡。
Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行 本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。
把要去重的字段中的使用hash shuffle打散到不同分区中进行分区,然后进行去重字段聚合计算!
SELECT day, SUM(cnt) FROM ( SELECT day, COUNT(DISTINCT user_id) as cnt FROM T GROUP BY day, MOD(HASH_CODE(user_id), 1024) ) GROUP BY day
使用filter对去重的字段进行过滤,过滤后去重字段值后最后进行分组聚合!
SELECT day, COUNT(DISTINCT user_id) AS total_uv, COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv, COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv FROM T GROUP BY day
-- flinksql里面最常用的事情就是时间格式转换,比如各种时间格式转换成TIMESTAMP(3). now() bigint -- CAST(TO_TIMESTAMP(log_time) as TIMESTAMP(3)) ,log_time=now() localtimestamp timestamp(3) timestamp -- 不带括号数字表示timestamp(6) now() 1403006911000 bigint -- 毫秒时间戳数值 1528257600000 localtimestamp 1636272032500 timestamp(3) -- 毫秒时间戳 timestamp(3) 1636272032500 -- 毫秒时间戳 timestamp(9) timestamp(6) TIMESTAMP(9) TO_TIMESTAMP(BIGINT time) TIMESTAMP(9) TO_TIMESTAMP(STRING time) TIMESTAMP(9) TO_TIMESTAMP(STRING time, STRING format) BIGINT TIMESTAMP_TO_MS(TIMTSTAMP time) BIGINT TIMESTAMP_TO_MS(STRING time, STRING format) TO_DATE(CAST(LOCALTIMESTAMP AS VARCHAR)) FROM_UNIXTIME(TIMESTAMP_TO_MS(localtimestamp)/1000, ‘yyyy-MM-dd HH:mm:ss’) event_time -- 6点到6点 time_pt as cast(to_timestamp(eventTime - 6 * 3600 * 1000) as TIMESTAMP(3)) -- 偏移6小时