前言:Flink在国内的占有率逐步提升的情况下,各项组件的功能与稳定性也得到逐步提升。为了解决目前已有的复杂需求,尝试研究flinksql的特性与功能,作为是否引入该组件的依据。同时尝试将现有需求通过简单demo的形式进行测试。本次测试主要集中在Kafka、mysql、Impala三个组件上,同时将结合官方文档进行:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/
本次研究测试需要用到以下组件:
CDH 6.3.2 Flink 1.12.2 mysql 5.7 impala 3.2.0-cdh6.3.2 kafka 2.2.1-cdh6.3.2
本次测试会将FlinkSql与kafka、mysql、impala等组件进行conn,因此需要以下依赖包:
flink-connector-kafka_2.11-1.12.2.jar flink-connector-jdbc_2.11-1.11.2.jar mysql-connector-java-5.1.47.jar ImpalaJDBC4.jar ImpalaJDBC41.jar flink-sql-connector-kafka_2.11-1.12.2.jar
将上述所需的jar包放入$FLINK_HOME/lib中之后(所有部署flink的服务器都需要放),重启yarn-session
yarn-session.sh --detached sql-client.sh embedded
FlinkSql-kafka相关资料:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
通过FlinkSql将Kafka中的数据映射成一张表
1、创建topic kafka-topics --create --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181 --replication-factor 3 --partitions 3 --topic test01 2、模拟消费者 kafka-console-consumer --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 --from-beginning 3、模拟生产者 kafka-console-producer --broker-list 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 4、删除topic kafka-topics --delete --topic test01 --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181
CREATE TABLE t1 ( name string, age BIGINT, isStu INT, opt STRING, optDate TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'test01', -- kafka topic 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'format' = 'csv' -- 数据源格式为 csv, ); select * from t1;
往kafka中写入数据,同时查看flinksql中t1表的变化
zhangsan,20,1,1 lisi,18,1,2 wangwu,30,2,2
通过kafka数据映射成表这个步骤,可以将数据实时的汇入表中,通过sql再进行后续操作,相对代码编写来说更为简易,同时有问题也相对好排查
upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。
如果有key则update,没有key则insert,如果value的值为空,则表示删除
drop table t2; CREATE TABLE t2 ( name STRING, age bigint, isStu INT, opt STRING, optDate TIMESTAMP(3) , PRIMARY KEY (name) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test02', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'key.format' = 'csv', 'value.format' = 'csv' );
将t1表中的数据写入到t2中
INSERT INTO t2 SELECT * FROM t1 ; select * from t2;
结果如下:
继续模拟kafka生产者,写入如下数据
zhangsan,25,1,2 risen,8,8,8 lisi,0,0,
结果如下:
通过如上测试,两条更新,一条插入,都已经实现了,
根据官方文档描述,指定key的情况下,当value为空则判断为删除操作
但是假如我插入一条数据到kafka,例如:
lisi,,,
只有key,没有value,t1表就会报如下错误
因为建表的时候有几个类型定义为了Int类型,这里为空它默认为是""空字符串,有点呆,推测如果是json格式这类可以指定数据类型的,才能直接使用。对于csv这种数据类型不确定的,会存在无法推断类型的情况。
鉴于此,为了探究是否真的具备删除操作,我又将上述所有表结构都进行了修改。为了试验简单,我直接修改表结构再次测试
drop TABLE t1; CREATE TABLE t1 ( name STRING, age STRING, isStu STRING, opt STRING, optDate TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'test01', -- kafka topic 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'format' = 'csv' -- 数据源格式为 csv, ); drop table t2; CREATE TABLE t2 ( name STRING, age STRING, isStu STRING, opt STRING, optDate TIMESTAMP(3) , PRIMARY KEY (name) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test02', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'key.format' = 'csv', 'value.format' = 'csv' ); INSERT INTO t2 SELECT * FROM t1 ; select * from t2;
依然没有在t2表中删除掉该条记录,该功能需要进一步探索,以后在跟进。
kafka-topics --create --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181 --replication-factor 3 --partitions 3 --topic test01 --config log.retention.minutes=10
kafka-console-producer --broker-list 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01
kafka-topics --delete --topic test01 --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181
kafka-console-consumer --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic output --from-beginning
kafka-topics --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 --describe
CREATE TABLE t1 ( name string, age BIGINT, isStu INT, opt STRING, optDate TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR optDate as optDate - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'test01', -- kafka topic 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'format' = 'csv' -- 数据源格式为 csv, );
CREATE TABLE t2 ( name STRING, age bigint, PRIMARY KEY (name) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'output', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'key.format' = 'csv', 'value.format' = 'csv' );
INSERT INTO t2 SELECT name, max(age) FROM t1 GROUP BY name;
zhangsan,18,1,insert lisi,20,2,update wangwu,30,1,delete
flink映射的kafka数据因为数据删除,导致t1表里为空
但是t2是基于t1的汇总表,在t1被清空的情况下,t2依旧存在
FlinkSql-JDBC相关资料:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
create table test.test01(name varchar(10),age int, primary key (name)); INSERT INTO test.test01(name, age)VALUES('zhangsan', 20); INSERT INTO test.test01(name, age)VALUES('lisi', 30); INSERT INTO test.test01(name, age)VALUES('wangwu', 18);
drop table mysqlTest ; create table mysqlTest ( name string, age int, PRIMARY KEY (name) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.5.187:3306/test', 'username' = 'root', 'password' = '123456', 'table-name' = 'test01' ); select * from mysqlTest;
INSERT INTO mysqlTest(name, age)VALUES('risen', 88);
在flink表与mysql表中,都多了该条记录
INSERT INTO mysqlTest (name, age) VALUES('zhangsan', 50);
flinksql
mysql
官方文档对delete简单提了一下,但是在实际中并没有
JDBC连接器允许使用JDBC驱动程序从任何关系数据库读取数据或将数据写入任何关系数据库。本文档介绍了如何设置JDBC连接器以对关系数据库运行SQL查询。 如果在DDL上定义了主键,则JDBC接收器将在upsert模式下运行以与外部系统交换UPDATE / DELETE消息,否则,它将在附加模式下运行,并且不支持使用UPDATE / DELETE消息。
尝试删除:
DELETE FROM mysqlTest where name='zhangsan'; INSERT INTO mysqlTest (name, age) VALUES('zhangsan', null);
flinkSql连接mysql,增删改查,增加与查询很容易实现,但是修改一定要在建表的时候,指定主键才可以实现upsert,删除目前好像没办法实现
drop table kudu_test.kuduTest; CREAT TABLE kudu_test.kuduTest ( name string, age BIGINT, isStu INT, opt STRING, PRIMARY KEY(name) )STORED AS KUDU; INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES('zhangsan', 20,1,'1'); INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES('lisi', 30,1,'1'); INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES('wangwu', 18,1,'1');
drop table impalaTest ; create table impalaTest ( name string, age int, PRIMARY KEY (name) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:impala://192.168.5.185:21050/kudu_test', 'username' = 'root', 'password' = '123456', 'table-name' = 'kuduTest', 'driver'='com.cloudera.impala.jdbc4.Driver' ); select * from impalaTest;
呃,不支持impala
目前暂不支持通过JDBC连接Impala
1、Flinksql支持kafka、mysql,且已经支持upsert功能,但是在测试delete的时候,发现都无法直接实现,但是可以通过汇总一次,在逻辑上实现。在尝试将flinksql连接impala的时候报错,目前暂不支持,但是可以考虑通过将数据写入kafka,最后impala来消费来实现。
2、在大数据场景中,每条数据都是有价值的。当某天有"统计删除了多少条数据"的需求时,物理删除掉的数据再也无法捞回,导致需求无法实现。所以建议不删除任何数据,以保留数据状态的形式,实现逻辑上的删除,即不统计当前状态为"删除"的数据。
通过简单demo实现:
1、维度表更新
2、实时统计指标
姓名,年龄,身份,在校状态(1:存在:2:不存在) name,age,identity,status zhangsan,20,1,1 lisi,18,1,2 wangwu,30,2,2
身份维度表 dim_identity 1:学生,2:老师
create table test.dim_identity(name varchar(10),identity int); INSERT INTO test.dim_identity(name, identity)VALUES('学生', 1); INSERT INTO test.dim_identity(name, identity)VALUES('老师', 2);
drop table dim_identity ; create table dim_identity ( name string, identity int ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.5.187:3306/test', 'username' = 'root', 'password' = '123456', 'table-name' = 'dim_identity' ); select * from dim_identity;
CREATE TABLE ods_kafka ( name string, age BIGINT, identity INT, status STRING, insertDate TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'stuLog', -- kafka topic 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'format' = 'csv' -- 数据源格式为 csv, ); select * from ods_kafka;
采用upsert的方式,以最新一条数据作为用户的状态
drop table tds_user_status; CREATE TABLE tds_user_status ( name STRING, age bigint, identity INT, status STRING, insertDate TIMESTAMP(3) , PRIMARY KEY (name) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'tdsResult', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'key.format' = 'csv', 'value.format' = 'csv' ); INSERT INTO tds_user_status SELECT * FROM ods_kafka ; select * from tds_user_status;
统计以下指标:
当前在校的老师数量 当前总共多少学生 学生占总数的比例 当前状态为在校占总数的比例
建FlinkSql表接收指标
drop table rpt_result; CREATE TABLE rpt_result ( inStuTeatherNum int, StudentNum int, StudengRate FLOAT, inStuRate FLOAT, countDate TIMESTAMP(3) METADATA FROM 'timestamp', PRIMARY KEY (countDate) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'rptResult', 'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092', -- kafka broker 地址 'key.format' = 'csv', 'value.format' = 'csv' );
开始统计:
INSERT INTO rpt_result SELECT sum(case when t2.name = '老师' and t1.status =1 then 1 else 0 end ) inStuTeatherNum ,sum(case when t2.name = '学生' then 1 else 0 end ) StudentNum ,sum(case when t2.name = '学生' then 1 else 0 end )/sum(1) StudengRate ,sum(case when t1.status = 1 then 1 else 0 end )/sum(1) inStuRate FROM tds_user_status t1 left join dim_identity t2 on t1.identity=t2.identity ; select * from rpt_result