MySql教程

Apache Flink之简单的使用Table API访问mysql数据库

本文主要是介绍Apache Flink之简单的使用Table API访问mysql数据库,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1. 声明

当前的内容主要为记录在学习Apache Flink中遇到的问题和主要记录访问msyql实现sql查询的基本操作

主要内容为:

  1. 使用Flink操作的SQL API访问mysql数据库

pom文件,其中${flink.version}1.13.0

<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>

		<!-- Add connector dependencies here. They must be in the default scope 
			(compile). -->
		<!-- 直接导入需要的flink到kafka的连接器 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- 提供Table Api的功能 (java版的) -->
		 <dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency> 
		<!-- 提供本地运行的能力 -->
		 <dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_2.11</artifactId>
			<version>1.13.0</version>
			<scope>provided</scope>
		</dependency>
		<!-- 提供jdbc的连接器的,可以连接数据库 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.13</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

2. 准备

1.提供通过flink连接mysql的连接器(手动导入maven依赖):官方文档
在这里插入图片描述
2.主要使用方式:官方文档

3.准备一个数据库flink_test并创建一个t_user的表
在这里插入图片描述

3. 主要demo

import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.sinks.CsvAppendTableSinkFactory;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import com.hy.flink.test.data.StudentDatas;

/**
 * 
 * @author hy
 * @createTime 2021-06-12 13:56:26
 * @description 当前内容主要为测试当前的TableApi的基本操作
 * 注意使用的时候需要导入对应的maven依赖:官方依赖否则编译报错
 *
 */
public class TableApiTest {
	@SuppressWarnings("deprecation")
	public static void main(String[] args) {
		// 无法启动:使用useAnyPlanner
		// EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build();
		// 无法启动:使用useOldPlanner
		// EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
		
		// 可以正常启动:使用useBlinkPlanner,但是会报错:MiniCluster is not yet running or has already been shut down.
		EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.createLocalEnvironment();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv,fsSettings);
	
		tableEnv.executeSql("CREATE TABLE t_user (\n" + 
				"id  INT,\n" + 
				"name   VARCHAR(50),\n" + 
				"age INT,\n" + 
				"score DOUBLE,\n" + 
				"className VARCHAR(50)\n" + 
				") WITH (\n" + 
				"   'connector' = 'jdbc',\n" + 
				"   'driver'='com.mysql.cj.jdbc.Driver',\n"+
				"   'url'='jdbc:mysql://localhost:3306/flink_test?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8',\n" + 
				"   'table-name'='t_user',\n" + 
				"	'username'='root',\n"+
				"	'password'='root'\n"+
				")");

		Table sqlQuery = tableEnv.sqlQuery("select * from t_user where name='张三'");
	
		TableResult tableResult = sqlQuery.execute();
		CloseableIterator<Row> collect = tableResult.collect();
		while(collect.hasNext()) {
			Row row = collect.next();
			System.out.println(row);
		}

	}
}

执行结果:
在这里插入图片描述
虽然报错了但是可以查询到数据,结果可以执行的

主要就是在执行sql的时候需要指定with并指定连接器方式和其他的属性

4. 主要出现的错误

  1. Partial inserts are not supported(当前不支持部分插入操作)
    例如:只能insert into t_user(id,name,age,score,className) values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202')不能使用insert into t_user values(1,'张三',18,55.5,'201'),(2,'李四',22,59.5,'202'),也就是不能手动选择添加的列项
  2. Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.(这个主要出现在不能使用local方式运行,导致的问题,如果需要在本地运行需要使用useBlinkPlanner()即可解决)

当前的Flink中的可以使用useBlinkPlanner在本地执行连接并执行SQL API的操作,但是使用useAnyPlanner和useOldPlanner是不行的,会出现报错2的情况

5.总结

1.当前的Flink虽然提供了操作数据库的各种连接器的SQL API但是在本地测试的时候还是以BlinkPlanner方式才可以执行

2.使用SQL API还是需要schame的,否则无法执行操作

这篇关于Apache Flink之简单的使用Table API访问mysql数据库的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!