Table API的特点Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下:
声明式 - 用户只关心做什么,不用关心怎么做;
高性能 - 支持查询优化,可以获取最好的执行性能;
流批统一 - 相同的统计逻辑,既可以流模式运行,也可以批模式运行;
标准稳定 - 语义遵循SQL标准,语法语义明确,不易变动。
当然除了SQL的特性,因为Table API是在Flink中专门设计的,所以Table API还具有自身的特点:
表达方式的扩展性 - 在Flink中可以为Table API开发很多便捷性功能,如:Row.flatten(), map/flatMap 等
功能的扩展性 - 在Flink中可以为Table API扩展更多的功能,如:Iteration,flatAggregate 等新功能
编译检查 - Table API支持java和scala语言开发,支持IDE中进行编译检查。
Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:
```xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.6.1</version> </dependency>
另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加: ```java ```xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.6.1</version> </dependency>
3.2. Table API和SQL程序的结构 Table API一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,再用类似于filter, join, 或者 select关系型转化操作来转化为一个新的Table对象。最后将一个Table对象转回一个DataSet或DataStream。从内部实现上来说,所有应用于Table的转化操作都变成一棵逻辑表操作树,在Table对象被转化回DataSet或者DataStream之后,转化器会将逻辑表操作树转化为对等的DataSet或者DataStream操作符。 Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;所以我们只需要使用一种来演示即可要想执行flink的SQL语句,首先需要获取SQL的执行环境:两种方式(batch和streaming): 批处理: ```java val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
流处理:
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:
在内部目录中注册一个表
注册外部目录
执行SQL查询
注册用户定义的(标量,表格或聚合)函数
转换DataStream或DataSet成Table
持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment
TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统输入表可以从各种来源注册:
现有Table对象,通常是表API或SQL查询的结果。
TableSource,它访问外部数据,例如文件,数据库或消息传递系统。
DataStream或DataSet来自DataStream或DataSet程序。
输出表可以使用注册TableSink。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table “projectedX”
tableEnv.registerTable(“projectedTable”, projTable)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("projectedTable ").select(…)
TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],…)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,…)
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = CsvTableSource.builder().path("./data/score.csv")... // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)
注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],…)
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
package com.ccj.pxj.sql import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.sinks.CsvTableSink object DataSet_DataStreamToTable { case class Order1(id:Long,proudct:String,amount:Int) def main(args: Array[String]): Unit = { //1. 获取流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2. 获取TableEnvironment val tableEnv= TableEnvironment.getTableEnvironment(env) //3. 加载本地集合 val dataStream: DataStream[Order1] = env.fromCollection(List( Order1(1, "beer", 3), Order1(2, "diaper", 4) , Order1(3, "ruber", 2) )) //4. 根据数据注册表 tableEnv.registerDataStream("s",dataStream) //5. 执行SQL val table = tableEnv.sqlQuery("select * from s") //6. 写入CSV文件中 table.printSchema() table.writeToSink(new CsvTableSink("./data/score_sql.csv",",",1,FileSystem.WriteMode.OVERWRITE)) //7. 执行任务 env.execute() } }
Table可以转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就可以基于Table API或者SQL查询的结果来执行了。
当将一个Table转换为DataStream或者DataSet时,你需要指定生成的DataStream或者DataSet的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是Row,下面列表概述了不同选项的功能:
Row:字段通过位置映射、可以是任意数量字段,支持空值,非类型安全访问
POJO:字段通过名称(POJO字段作为Table字段时,必须命名)映射,可以是任意数量字段,支持空值,类型安全访问
Case Class:字段通过位置映射,不支持空值,类型安全访问
Tuple:字段通过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问
Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。
流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。
有两种模式可以将 Table转换为DataStream:
1:Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,之前发送的结果不会被更新。
2:Retract Mode:始终都可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。
package com.ccj.pxj.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{Table, TableEnvironment} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.api.scala._ object TableTODataStream { def main(args: Array[String]): Unit = { //1. 获取流处理环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. 设置并行度 env.setParallelism(1) // 3. 获取Table运行环境 val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) // 4. 加载本地集合 val dataStream: DataStream[(Long, Int, String)] = env.fromCollection(List( (1L, 1, "Hello"), (2L, 2, "Hello"), (6L, 6, "Hello"), (7L, 7, "Hello World"), (8L, 8, "Hello World"), (20L, 20, "Hello World") ) ) // 5. 转换DataStream为Table val table: Table = tableEnv.fromDataStream(dataStream) // 6. 将table转换为DataStream----将一个表附加到流上Append Mode val appendDataStream: DataStream[(Long, Int, String)] = tableEnv.toAppendStream[(Long, Int, String)](table) //7. 将table转换为DataStream----Retract Mode true代表添加消息,false代表撤销消息 val retractDataStream: DataStream[(Boolean, (Long, Int, String))] = tableEnv.toRetractStream[(Long, Int, String)](table) //8. 打印输出 appendDataStream.print() println("----------------------") retractDataStream.print() // 9. 执行任务 env.execute("pxj") } }
package com.ccj.pxj.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api.{Table, TableEnvironment} import org.apache.flink.table.api.scala.BatchTableEnvironment object TableTODataSet { def main(args: Array[String]): Unit = { //1. 获取批处理环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //2. 设置并行度 env.setParallelism(1) //3. 获取Table运行环境 val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env) //4. 加载本地集合 val dataSet: DataSet[(Long, Int, String)] = env.fromCollection(List( (1L, 1, "Hello"), (2L, 2, "Hello"), (3L, 3, "Hello"), (7L, 7, "Hello World"), (8L, 8, "Hello World"), (20L, 20, "Hello World") )) //5. DataSet转换为Table val table: Table = tableEnv.fromDataSet(dataSet) //6. table转换为dataSet val result: DataSet[(Long, Int, String)] = tableEnv.toDataSet[(Long, Int, String)](table) //7. 打印输出 result.print() println("--------------------------------") println(table) println("---------------") table.printSchema() //env.execute() } }
package com.ccj.pxj.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api.{Table, TableEnvironment} import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.types.Row object BatchFlinkSqlDemo { //创建一个样例类Order用来映射数据(订单名、用户名、订单日期、订单金额) case class Order(id:Int, userName:String, createTime:String, money:Double) def main(args: Array[String]): Unit = { //1. 获取一个批处理运行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment // 2. 获取一个Table运行环境 val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env) // 3. 创建一个样例类`Order`用来映射数据(订单名、用户名、订单日期、订单金额) //4. 基于本地`Order`集合创建一个DataSet source val dataSet: DataSet[Order] = env.fromCollection(List( Order(1, "zhangsan", "2018-10-20 15:30", 358.5), Order(2, "zhangsan", "2018-10-20 16:30", 131.5), Order(3, "lisi", "2018-10-20 16:30", 127.5), Order(4, "lisi", "2018-10-20 16:30", 328.5), Order(5, "lisi", "2018-10-20 16:30", 432.5), Order(6, "zhaoliu", "2018-10-20 22:30", 451.0), Order(7, "zhaoliu", "2018-10-20 22:30", 362.0), Order(8, "zhaoliu", "2018-10-20 22:30", 364.0), Order(9, "zhaoliu", "2018-10-20 22:30", 341.0) )) //5. 使用Table运行环境将DataSet注册为一张表 tableEnv.registerDataSet("t",dataSet) // 6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数) val table: Table = tableEnv.sqlQuery("select userName,sum(money)" + ",max(money)" + ",min(money),count(1) from t group by userName" ) //7. 使用TableEnv.toDataSet将Table转换为DataSet val resultDataSet: DataSet[Row] = tableEnv.toDataSet[Row](table) //8. 打印测试 table.printSchema() println("-----") resultDataSet.print() // env.execute("pxj") } }
package com.ccj.pxj.sql import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.core.fs.FileSystem import org.apache.flink.table.api.{Table, TableEnvironment, Types} import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.table.sources.CsvTableSource object BatchTableDemo { def main(args: Array[String]): Unit = { //1. 获取批处理运行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //2. 获取Table运行环境 val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env) //3. 加载外部CSV文件 val csvTableSource: CsvTableSource = CsvTableSource.builder() .path("data/score.csv") //加载文件路径 .field("id", Types.INT) // 列名,类型定义 .field("name", Types.STRING) .field("subjectId", Types.INT) .field("score", Types.DOUBLE) .fieldDelimiter(",") // 属性间分隔符 .lineDelimiter("\n") // 换行符 // .ignoreFirstLine() // 忽略第一行内容 .ignoreParseErrors() // 忽略解析错误 .build() //4. 将外部数据构建成表 tableEnv.registerTableSource("t",csvTableSource) //5. 使用table方式查询数据 val table: Table = tableEnv.scan("t") .select("id,name,subjectId,score") .filter("name='张三'") //6. 打印表结构 table.printSchema() //7. 将数据落地到新的CSV文件中 table.writeToSink(new CsvTableSink("./data/score_table.csv",",",1, FileSystem.WriteMode.OVERWRITE)) // 8. 执行任务 env.execute() } }
流数据处理案例
import java.util.UUID import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{Table, TableEnvironment} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.types.Row import scala.util.Random object StreamFlinkSqlDemo { // 4. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳) case class Order(id: String, userId: Int, money: Int, createTime: Long) def main(args: Array[String]): Unit = { // 1. 获取流处理运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 获取Table运行环境 val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) // 3. 设置处理时间为`EventTime` env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 5. 创建一个自定义数据源 val orderDataStream: DataStream[Order] = env.addSource(new RichSourceFunction[Order] { override def run(ctx: SourceFunction.SourceContext[Order]): Unit = { // - 使用for循环生成1000个订单 for (i <- 0 until 1000) { // - 随机生成订单ID(UUID) val id = UUID.randomUUID().toString // - 随机生成用户ID(0-2) val userId = Random.nextInt(3) // - 随机生成订单金额(0-100) val money = Random.nextInt(101) // - 时间戳为当前系统时间 val timestamp = System.currentTimeMillis() // 收集数据 ctx.collect(Order(id, userId, money, timestamp)) // - 每隔1秒生成一个订单 TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = { } }) // 6. 添加水印,允许延迟2秒 val waterDataStream: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order]() { var currentTimeStamp = 0L // 获取水印 override def getCurrentWatermark: Watermark = { new Watermark(currentTimeStamp - 2000) } // 获取当前时间 override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = { currentTimeStamp = Math.max(element.createTime, previousElementTimestamp) currentTimeStamp } }) // 7. 导入`import org.apache.flink.table.api.scala._`隐式参数 import org.apache.flink.table.api.scala._ // 8. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段 tableEnv.registerDataStream("t_order",waterDataStream,'id, 'userId, 'money, 'createTime.rowtime) // 9. 编写SQL语句统计用户订单总数、最大金额、最小金额 // - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口 val sql = """ |select | userId, | count(1) as totalCount, | max(money) as maxMoney, | min(money) as minMoney | from | t_order | group by | userId, | tumble(createTime, interval '5' second) """.stripMargin // 10. 使用`tableEnv.sqlQuery`执行sql语句 val table: Table = tableEnv.sqlQuery(sql) // 11. 将SQL的执行结果转换成DataStream再打印出来 tableEnv.toAppendStream[Row](table).print() // 12. 启动流处理程序 env.execute("pxj") } }
作者:pxj
日期:2021-07-18