1.18.2.8.与DataStream和DataSet API结合
1.18.2.8.1.Scala隐式转换
1.18.2.8.2.通过DataSet或DataStream创建视图
1.18.2.8.3.将DataStream或DataSet转换成表
1.18.2.8.4.将表转换成DataStream或DataSet
1.18.2.8.5.将表转换成DataStream
1.18.2.8.5.1.将表转换成DataSet
1.18.2.8.6.数据类型到Table Schema的映射
1.18.2.8.6.1.原子类型
1.18.2.8.6.2.Tuple类型(Scala和Java)和 Case Class类型(仅Scala)
1.18.2.8.6.3.POJO类型(Java和Scala)
1.18.2.8.6.4.Row类型
在流处理方面两种计划器都可以与 DataStream API 结合。只有旧计划器可以与 DataSet API 结合。在批处理方面,Blink 计划器不能同两种计划器中的任何一个结合。
**注意:**下文讨论的DataSet API只与旧计划器有关。
Table API 和 SQL 可以被很容易地集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询外部表(例如从 RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据 join,然后使用 DataStream 或 DataSet API(以及在这些 API 之上构建的任何库,例如 CEP 或 Gelly)。相反,也可以将 Table API 或 SQL 查询应用于 DataStream 或 DataSet 程序的结果。
这种交互可以通过 DataStream 或 DataSet 与 Table 的相互转化实现。
Scala Table API 含有对 DataSet、DataStream 和 Table 类的隐式转换。 通过为 Scala DataStream API 导入org.apache.flink.table.api.bridge.scala._ 包以及 org.apache.flink.api.scala._ 包,可以启用这些转换。
在TableEnvironment中可以将DataStream或DataSet注册成视图。结果视图的schema取决于注册的DataStream或DataSet的数据类型。请参阅文档“数据类型到table schema的映射”(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#mapping-of-data-types-to-table-schema)获取详细信息。
**注意:**通过 DataStream 或 DataSet 创建的视图只能注册成临时视图。
Java代码版
package com.toto.demo.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import scala.Tuple2; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { // get StreamTableEnvironment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); // get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, String>> stream = ... // register the DataStream as View "myTable" with fields "f0", "f1" tableEnv.createTemporaryView("myTable", stream); // register the DataStream as View "myTable2" with fields "myLong", "myString" tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString")); } }
Scala版
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings) // get TableEnvironment // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // register the DataStream as View "myTable" with fields "f0", "f1" tableEnv.createTemporaryView("myTable", stream) // register the DataStream as View "myTable2" with fields "myLong", "myString" tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString') } }
与在TableEnvironment 注册 DataStream 或 DataSet 不同,DataStream 和 DataSet 还可以直接转换成Table。如果你想在 Table API 的查询中使用表,这将非常便捷。
Java代码版
package com.toto.demo.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import scala.Tuple2; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { // get StreamTableEnvironment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); DataStream<Tuple2<Long, String>> stream = ... //Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // Convert the DataStream into a Table with fields "myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString")); } }
Scala版
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings) // get TableEnvironment // registration of a DataSet is equivalent val tableEnv = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // convert the DataStream into a Table with default fields "_1", "_2" val table1: Table = tableEnv.fromDataStream(stream) // convert the DataStream into a Table with fields "myLong", "myString" val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString") } }
Table可以被转换成 DataStream 或 DataSet。通过这种方式,定制的 DataSet 或 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。
将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row 。以下列表概述了不同选项的功能:
Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。
流式查询(streaming query)的结果表会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样将动态查询结果转换成 DataStream 需要对表的更新方式进行编码。
将Table 转换为 DataStream 有两种模式:
1.Append Mode: 仅当动态Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
2.Retract Mode: 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
Java代码
package com.toto.demo.sql; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Demo { public static void main(String[] args) { // get StreamTableEnvironment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); Table table = ... // convert the Table into an append DataStream of Row by specifying the class DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> // via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING, Types.INT); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); //convert the Table into a retract DataStream of Row. //A retract stream of type X is a DataStream<Tuple2<Boolean,X>>. //The boolean field indicates the type of change. //True is INSERT,false is DELETE. DataStream<Tuple2<Boolean,Row>> retractStream = tableEnv.toRetractStream(table, Row.class); } }
Scala代码
package com.toto.demo.sql; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Demo { public static void main(String[] args) { // get StreamTableEnvironment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); Table table = ... // convert the Table into an append DataStream of Row by specifying the class DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> // via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING, Types.INT); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); //convert the Table into a retract DataStream of Row. //A retract stream of type X is a DataStream<Tuple2<Boolean,X>>. //The boolean field indicates the type of change. //True is INSERT,false is DELETE. DataStream<Tuple2<Boolean,Row>> retractStream = tableEnv.toRetractStream(table, Row.class); } }
注意: 文档动态表给出了有关动态表及其属性的详细讨论。
**注意:**一旦Table被转化为DataStream,必须使用StreamExecutionEnvironment 的 execute 方法执行该 DataStream作业。
将Table转换成DataSet的过程如下:
Java代码
package com.toto.demo.sql; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.types.Row; public class Demo { public static void main(String[] args) { ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv); // Table with two fields (String name, Integer age) Table table = ... // convert the Table into a DataSet of Row by specifying a class DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); //convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation TupleTypeInfo<Tuple2<String,Integer>> tupleType = new TupleTypeInfo<>( Types.STRING, Types.INT ); DataSet<Tuple2<String,Integer>> dsTuple = tableEnv.toDataSet(table,tupleType); } }
Scala代码:
package com.toto.learn.sql import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.table.api.Table import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment import org.apache.flink.types.Row object Demo { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = BatchTableEnvironment.create(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table) } }
**注意:**一旦Table被转化为DataSet,必须使用ExecutionEnvironment 的 execute 方法执行该 DataSet作业。
Flink 的 DataStream 和 DataSet APIs 支持多样的数据类型。例如 Tuple(Scala 内置以及Flink Java tuple)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 DataStream 转换成 Table 的样例。
数据类型到table schema的映射有两种方式:基于字段位置或基于字段名称。
基于位置映射
基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用as重命名。
定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
Java代码
package com.toto.demo.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStream<Tuple2<Long, Integer>> stream = ... // convert DataStream into Table with default field names "f0" and "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field "myLong" only Table table2 = tableEnv.fromDataStream(stream, $("myLong")); // convert DataStream into Table with field names "myLong" and "myInt" Table table3 = tableEnv.fromDataStream(stream, $("myLong"), $("myInt")); } }
Scala代码
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val tableEnv = TableEnvironment.create(bsSettings) val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "myLong" only val table: Table = tableEnv.fromDataStream(stream, $"myLong") // convert DataStream into Table with field names "myLong" and "myInt" val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt") } }
基于名称的映射
基于名称的映射适用于任何数据类型包括POJO类型。这是定义table schema映射最灵活的方式。映射中的所有字段均按名称引用,并且可以通过as重命名。字段可以被重新排序和映射。
若果没有指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
Java代码
package com.toto.demo.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStream<Tuple2<Long, Integer>> stream = ... // convert DataStream into Table with default field names "f0" and "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with swapped fields Table table2 = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" Table table3 = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong")); } }
Scala代码
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val tableEnv = TableEnvironment.create(bsSettings) val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "_2" only val table: Table = tableEnv.fromDataStream(stream, $"_2") // convert DataStream into Table with swapped fields val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1") // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong") } }
Flink将基础数据类型(Integer、Double、String)或者通用数据类型(不可再拆分的数据类型)视为原子类型。原子类型的DataStream 或者 DataSet 会被转换成只有一条属性的Table。属性的数据类型可以由原子类型推断出,还可以重新命名属性。
Java代码:
package com.toto.demo.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStream<Long> stream = ... // convert DataStream into Table with default field name "f0" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field name "myLong" Table table2 = tableEnv.fromDataStream(stream, $("myLong")); } }
Scala代码
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val tableEnv = TableEnvironment.create(bsSettings) val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "myLong" val table: Table = tableEnv.fromDataStream(stream, $"myLong") } }
Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。两种 tuple 的 DataStream 和 DataSet 都能被转换成表。可以通过提供所有字段名称来重命名字段(基于位置映射)。如果没有指明任何字段名称,则会使用默认的字段名称。如果引用了原始字段名称(对于 Flink tuple 为f0、f1 … …,对于 Scala tuple 为_1、_2 … …),则 API 会假定映射是基于名称的而不是基于位置的。基于名称的映射可以通过 as 对字段和投影进行重新排序。
Java代码:
package com.toto.demo.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import scala.Tuple2; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStream<Tuple2<Long, String>> stream = ... // convert DataStream into Table with default field names "f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myLong", "myString" (position-based) Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString")); // convert DataStream into Table with reordered fields "f1", "f0" (name-based) Table table3 = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // convert DataStream into Table with projected field "f1" (name-based) Table table4 = tableEnv.fromDataStream(stream, $("f1")); // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) Table table5 = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as("myLong")); } }
Scala代码:
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val tableEnv = TableEnvironment.create(bsSettings) val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with renamed default field names '_1, '_2 val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field names "myLong", "myString" (position-based) val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString") // convert DataStream into Table with reordered fields "_2", "_1" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1") // convert DataStream into Table with projected field "_2" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"_2") // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong") // define case class case class Person(name: String, age: Int) val streamCC: DataStream[Person] = ... // convert DataStream into Table with default field names 'name, 'age val table = tableEnv.fromDataStream(streamCC) // convert DataStream into Table with field names 'myName, 'myAge (position-based) val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge") // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName") } }
FLINK支持POJO类型作为复合类型
在不指定字段名称的情况下将 POJO 类型的 DataStream 或 DataSet 转换成 Table 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 as 关键字)来重命名,重新排序和投影。
Java代码:
package com.toto.demo.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // Person is a POJO with fields "name" and "age" DataStream<Person> stream = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) Table table1 = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) Table table2 = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName")); // convert DataStream into Table with projected field "name" (name-based) Table table3 = tableEnv.fromDataStream(stream, $("name")); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table4 = tableEnv.fromDataStream(stream, $("name").as("myName")); } }
Scala代码:
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // Person is a POJO with field names "name" and "age" val stream: DataStream[Person] = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName") // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name") // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
Row 类型支持任意数量的字段以及具有 null 值的字段。字段名称可以通过 RowTypeInfo 指定,也可以在将 Row 的 DataStream 或 DataSet 转换为Table 时指定。Row类型的字段映射支持基于名称和基于位置两种方式。字段可以通过提供所有字段的名称的方式重命名(基于位置映射)或者分别选择进行投影/排序/重命名(基于名称映射)。
Java代码
package com.toto.demo.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class Demo { public static void main(String[] args) { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // Person is a POJO with fields "name" and "age" DataStream<Row> stream = ... // convert DataStream into Table with default field names "name", "age" Table table1 = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) Table table2 = tableEnv.fromDataStream(stream, $("myName"), $("myAge")); // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) Table table3 = tableEnv.fromDataStream(stream, $("name").as("myName"), $("age").as("myAge")); // convert DataStream into Table with projected field "name" (name-based) Table table4 = tableEnv.fromDataStream(stream, $("name")); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table5 = tableEnv.fromDataStream(stream, $("name").as("myName")); } }
Scala代码
package com.toto.learn.sql import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row object Demo { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings) // or val tableEnv = TableEnvironment.create(bsSettings) // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` val stream: DataStream[Row] = ... // convert DataStream into Table with default field names "name", "age" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge") // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge") // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name") // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName") } }