目录
API
获取环境
创建表
查询表
Table API
SQL
写出表
与DataSet/DataStream集成
TableAPI
SQLAPI
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); // ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); // ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); // ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...); // register the Table projTable as table "projectedTable" tableEnv.createTemporaryView("projectedTable", projTable);
tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("MyTable")
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register Orders table // scan registered Orders table Table orders = tableEnv.from("Orders");// compute revenue for all customers from France Table revenue = orders .filter($("cCountry") .isEqual("FRANCE")) .groupBy($("cID"), $("cName") .select($("cID"), $("cName"), $("revenue") .sum() .as("revSum")); // emit or convert Table // execute query
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register Orders table // compute revenue for all customers from France Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // emit or convert Table // execute query
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register "Orders" table // register "RevenueFrance" output table // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.executeSql( "INSERT INTO RevenueFrance " + "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" );
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // create an output Table final Schema schema = new Schema() .field("a", DataTypes.INT()) .field("b", DataTypes.STRING()) .field("c", DataTypes.BIGINT()); tableEnv.connect(new FileSystem().path("/path/to/file")) .withFormat(new Csv().fieldDelimiter('|').deriveSchema()) .withSchema(schema) .createTemporaryTable("CsvSinkTable"); // compute a result Table using Table API operators and/or SQL queries Table result = ... // emit the result Table to the registered TableSink result.executeInsert("CsvSinkTable");
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, String>> stream = ... // register the DataStream as View "myTable" with fields "f0", "f1" tableEnv.createTemporaryView("myTable", stream); // register the DataStream as View "myTable2" with fields "myLong", "myString" tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, String>> stream = ... // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // Convert the DataStream into a Table with fields "myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
Convert a Table into a DataStream
Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.
追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。
Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.
撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。
// get StreamTableEnvironment. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // Table with two fields (String name, Integer age) Table table = ... // convert the Table into an append DataStream of Row by specifying the class DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> // via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
Convert a Table into a DataSet
// get BatchTableEnvironment BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); // Table with two fields (String name, Integer age) Table table = ... // convert the Table into a DataSet of Row by specifying a class DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/