Flink目前在国内发展的火热,笔者在2018首次接触了flink之后,总是在官网/公众号各个地方追踪它的新动态,但一直没机会在生产上使用,近期有流式计算的需求,且目前企业对计算的实时性也要求越来越高,今天先在本地环境测试一把。测试把kafka中数据通过flink处理后写入mysql。
环境: java8 , scala2.12
版本: flink1.13.0
maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--kafka connector--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- kafka中数据已json格式存储,解析需要flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!--JDBC connector--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency>
for (int i = 1; i < 10; i++) { JSONObject json = new JSONObject(); json.put("id", i+""); json.put("name", "name"+i); ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>( "flinksqldemo", i, json.toJSONString() ); // 发送消息 producer.send(record);
// 创建执行环境 //EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // 把kafka中的topic映射成一个输入临时表 tableEnv.executeSql( "CREATE TABLE source1(id STRING, name STRING) WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flinkdemo'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'flinkGroup'," + " 'scan.startup.mode' = 'earliest-offset'," + " 'format' = 'json'," + " 'json.fail-on-missing-field' = 'false'," + " 'json.ignore-parse-errors' = 'true')" ); // test_info表需在mysql中提前建好 // 把Mysql中的表映射为一个输出临时表 String mysql_sql = "CREATE TABLE mysql_sink (" + " id string," + " name string " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false'," + " 'table-name' = 'test_info'," + " 'username' = 'test'," + " 'password' = 'pwd'" + ")"; tableEnv.executeSql(mysql_sql); // 插入数据 tableEnv.executeSql("INSERT INTO mysql_sink SELECT id, name FROM source1");
mysql查询表,可观察到在实时的插入数据。
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "<EOF>" at line 1, column 263. Was expecting one of: "UESCAPE" ... <QUOTED_STRING> ... ")" ... "," ...
原因是以下代码中,WITH ( xxx )
,少了右括号
// 把kafka中的topic映射成一个输入临时表 tableEnv.executeSql( "CREATE TABLE sensor_source(id STRING, name STRING) WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flinkdemo'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'flinkGroup'," + " 'scan.startup.mode' = 'earliest-offset'," + " 'format' = 'json'" );
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
缺少flink-json依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
Exception in thread "main" org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:775) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) at org.apache.FlinkSqlDemo.main(FlinkSqlDemo.java:71) Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
缺少依赖flink-client
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
Process finished with exit code 0
缺少jdbc jar包
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency>
# 输出到控制台 String print_sql = "CREATE TABLE print_sink (" + "id STRING," + "name STRING" + ") WITH (" + " 'connector' = 'print'" + ")"; tableEnv.executeSql(print_sql ); tableEnv.executeSql("INSERT INTO print_sink SELECT * FROM sensor_source"); # sink到kafka另一个主题 String kafka_sink_sql = "create table kafka_sink (id string, name string) with (" + " 'connector' = 'kafka'," + " 'topic' = 'test_info_2'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'format' = 'json'" + ")"; tableEnv.executeSql(kafka_sink_sql); tableEnv.executeSql("insert into kafka_sink select * from sensor_source");