当前的内容主要为记录在学习Apache Flink中遇到的问题和主要记录访问msyql实现sql查询的基本操作
主要内容为:
pom文件,其中${flink.version}
为1.13.0
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope> --> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- 直接导入需要的flink到kafka的连接器 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- 提供Table Api的功能 (java版的) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- 提供本地运行的能力 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency> <!-- 提供jdbc的连接器的,可以连接数据库 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.13</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> </dependencies>
1.提供通过flink连接mysql的连接器(手动导入maven依赖):官方文档
2.主要使用方式:官方文档
3.准备一个数据库flink_test
并创建一个t_user
的表
import java.util.HashMap; import java.util.Map; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.sinks.CsvAppendTableSinkFactory; import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import com.hy.flink.test.data.StudentDatas; /** * * @author hy * @createTime 2021-06-12 13:56:26 * @description 当前内容主要为测试当前的TableApi的基本操作 * 注意使用的时候需要导入对应的maven依赖:官方依赖否则编译报错 * */ public class TableApiTest { @SuppressWarnings("deprecation") public static void main(String[] args) { // 无法启动:使用useAnyPlanner // EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build(); // 无法启动:使用useOldPlanner // EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); // 可以正常启动:使用useBlinkPlanner,但是会报错:MiniCluster is not yet running or has already been shut down. EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv,fsSettings); tableEnv.executeSql("CREATE TABLE t_user (\n" + "id INT,\n" + "name VARCHAR(50),\n" + "age INT,\n" + "score DOUBLE,\n" + "className VARCHAR(50)\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'driver'='com.mysql.cj.jdbc.Driver',\n"+ " 'url'='jdbc:mysql://localhost:3306/flink_test?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8',\n" + " 'table-name'='t_user',\n" + " 'username'='root',\n"+ " 'password'='root'\n"+ ")"); Table sqlQuery = tableEnv.sqlQuery("select * from t_user where name='张三'"); TableResult tableResult = sqlQuery.execute(); CloseableIterator<Row> collect = tableResult.collect(); while(collect.hasNext()) { Row row = collect.next(); System.out.println(row); } } }
执行结果:
虽然报错了但是可以查询到数据,结果可以执行的
主要就是在执行sql的时候需要指定with并指定连接器方式和其他的属性
Partial inserts are not supported
(当前不支持部分插入操作)insert into t_user(id,name,age,score,className) values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202')
不能使用insert into t_user values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202')
,也就是不能手动选择添加的列项Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.
(这个主要出现在不能使用local方式运行,导致的问题,如果需要在本地运行需要使用useBlinkPlanner()即可解决)当前的Flink中的可以使用useBlinkPlanner
在本地执行连接并执行SQL API的操作,但是使用useAnyPlanner和useOldPlanner
是不行的,会出现报错2的情况
1.当前的Flink虽然提供了操作数据库的各种连接器的SQL API但是在本地测试的时候还是以BlinkPlanner方式才可以执行
2.使用SQL API还是需要schame的,否则无法执行操作