<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <!-- IDEA运行需要 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <!-- 实现custom format or connector需要 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency>
apiTest\TableBatchDemo.scala
package apiTest import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT} import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row} import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations} object TableBatchDemo { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance(). useBlinkPlanner().inBatchMode().build() val tEnv = TableEnvironment.create(settings) // 定义数据类型 val MyOrder = ROW(FIELD("id", BIGINT()), FIELD("product", STRING()), FIELD("amount", INT())) val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1), row(2L, "Tesla", 8), row(2L, "Tesla", 8), row(3L, "BYD", 20)) val filtered = table.where($("amount").isGreaterOrEqual(8)) // 调用execute,数据被collect到Job Manager filtered.execute().print() } }
结果如下:
+----------------------+--------------------------------+-------------+ | id | product | amount | +----------------------+--------------------------------+-------------+ | 2 | Tesla | 8 | | 2 | Tesla | 8 | | 3 | BYD | 20 | +----------------------+--------------------------------+-------------+ 3 rows in set
apiTest\TableStreamDemo.scala
package apiTest import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING} object TableStreamDemo { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(senv, bsSettings) // 此方式定义的tEnv不能使用fromDataStream函数 // val tEnv = TableEnvironment.create(bsSettings) var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction()) val table = tEnv.fromDataStream(dataStream, $("word")) val filtered = table.where($("word").like("%t%")) val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN) println(explain) // 定义隐式值给toAppendStream函数 implicit val row_string_type = ROW(STRING) tEnv.toAppendStream(filtered) .print("table") senv.execute() } }
结果如下:
== Abstract Syntax Tree == LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')]) +- LogicalTableScan(table=[[Unregistered_DataStream_1]]) == Optimized Physical Plan == Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')]) +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word]) == Optimized Execution Plan == Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')]) +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word]) == Physical Execution Plan == { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])", "pact" : "Operator", "contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "pact" : "Operator", "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "parallelism" : 1, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } table:6> +I[stream] table:7> +I[table] table:8> +I[batch] table:1> +I[batch] ......省略部分......
apiTest\SqlBatchDemo.scala
package apiTest import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING} import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row} import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal} object SqlBatchDemo { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance() .useBlinkPlanner().inBatchMode().build() val tEnv = TableEnvironment.create(settings) val MyOrder = ROW(FIELD("id", BIGINT()), FIELD("product", STRING()), FIELD("amount", INT()) ) val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1), row(2L, "Tesla", 8), row(2L, "Tesla", 8), row(3L, "BYD", 20)) tEnv.createTemporaryView("myOrder",input) val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product") table.execute().print() } }
结果如下:
+--------------------------------+-------------+ | product | amount | +--------------------------------+-------------+ | Tesla | 16 | | BYD | 20 | | BMW | 1 | +--------------------------------+-------------+ 3 rows in set
apiTest\SqlStreamDemo.scala
package apiTest import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.table.api.{$, EnvironmentSettings} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING} object SqlStreamDemo { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(senv,bsSettings) val stream:DataStream[String] = senv.addSource(new WordSourceFunction()) val table = tEnv.fromDataStream(stream, $("word")) val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'") implicit val row_string_type = ROW(STRING) tEnv.toAppendStream(result).print() println(senv.getExecutionPlan) senv.execute() } }
结果如下:
{ "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])", "pact" : "Operator", "contents" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "pact" : "Operator", "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "parallelism" : 1, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 4, "type" : "SinkConversionToRow", "pact" : "Operator", "contents" : "SinkConversionToRow", "parallelism" : 1, "predecessors" : [ { "id" : 3, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 5, "type" : "Sink: Print to Std. Out", "pact" : "Data Sink", "contents" : "Sink: Print to Std. Out", "parallelism" : 8, "predecessors" : [ { "id" : 4, "ship_strategy" : "REBALANCE", "side" : "second" } ] } ] } 7> +I[batch] 8> +I[table] 1> +I[stream] 2> +I[stream] 3> +I[table] ......省略部分......
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val settings = EnvironmentSettings .newInstance() .inStreamingMode() // 用于stream模式 //.inBatchMode() // 用于batch模式 .build() val tEnv = TableEnvironment.create(settings)
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment val senv = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(senv)
mysql中表user1的结构和数据,和表user2的结构如下:
mysql> mysql> show create table flink_test.user1; +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | user1 | CREATE TABLE `user1` ( `id` bigint NOT NULL, `name` varchar(128) DEFAULT NULL, `age` int DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec) mysql> mysql> select * from flink_test.user1; +----+------+------+ | id | name | age | +----+------+------+ | 1 | yi | 1 | | 2 | er | 2 | | 3 | san | 3 | +----+------+------+ 3 rows in set (0.00 sec) mysql> mysql> show create table flink_test.user2; +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | user2 | CREATE TABLE `user2` ( `id` bigint NOT NULL, `name` varchar(128) DEFAULT NULL, `age` int DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.01 sec) mysql>
TableSqlTest.scala
package TableApiTest import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TableSqlTest { def main(args: Array[String]): Unit = { // 定义Table环境 val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) val table_str = """ |create temporary table %s( | id bigint, | name string, | age int, | primary key (id) not enforced |) with ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://192.168.23.33:3306/flink_test', | 'driver' = 'com.mysql.cj.jdbc.Driver', | 'table-name' = '%s', | 'username' = 'root', | 'password' = 'Root_123' |) |""".stripMargin // 在catalog注册表 tEnv.executeSql(table_str.format("user1", "user1")) tEnv.executeSql(table_str.format("user2", "user2")) // =====================读取源表数据===================== // val user1 = tEnv.from("user1") // 方式一 // val user1 = tEnv.sqlQuery("select * from user1 limit 2") // 方式二 // =====================向目标表插入数据===================== // user1.executeInsert("user2") // 方式一 val stmtSet = tEnv.createStatementSet() // stmtSet.addInsert("user2", user1) // 方式二 stmtSet.addInsertSql("insert into user2 select * from user1 limit 2") // 方式三 stmtSet.execute() } }
执行TableSqlTest.scala,查询mysql中的表user2数据
mysql> mysql> select * from user2; +----+------+------+ | id | name | age | +----+------+------+ | 1 | yi | 1 | | 2 | er | 2 | +----+------+------+ 2 rows in set (0.00 sec) mysql>
代码示例:
package TableApiTest import org.apache.flink.table.api.Expressions.{$, lit, row} import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TableSqlTest { def main(args: Array[String]): Unit = { // 定义Table环境 val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) val table = tEnv.fromValues( row(10, "A"), row(20, "A"), row(100, "B"), row(200, "B") ).renameColumns($("f0").as("amount"), $("f1").as("name")) tEnv.createTemporaryView("tmp_table", table) val table_result = table .filter($("amount").isGreater(lit(0))) .groupBy($("name")) .select($("name"), $("amount").sum().as("amount")) table_result.execute().print() val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name") sql_result.execute().print() } }
执行结果:
+----+--------------------------------+-------------+ | op | name | amount | +----+--------------------------------+-------------+ | +I | A | 10 | | -U | A | 10 | | +U | A | 30 | | +I | B | 100 | | -U | B | 100 | | +U | B | 300 | +----+--------------------------------+-------------+ 6 rows in set +----+--------------------------------+-------------+ | op | name | amount | +----+--------------------------------+-------------+ | +I | A | 10 | | -U | A | 10 | | +U | A | 30 | | +I | B | 100 | | -U | B | 100 | | +U | B | 300 | +----+--------------------------------+-------------+ 6 rows in set