flink默认是每一条数据都会取更新状态
MiniBatch :缓存一批数据一起更新状态,优点:增加吞吐量,缺点:增加延迟-
开启MiniBatch
-- sql中开启 -- 开启 set table.exec.mini-batch.enabled=true; -- 最大缓存时间 set table.exec.mini-batch.allow-latency='5 s'; -- 批次大小 set table.exec.mini-batch.size=1000;
开启预聚合需要先开启MiniBatch
set table.exec.mini-batch.enabled=true; -- 最大缓存时间 set table.exec.mini-batch.allow-latency='5 s'; -- 批次大小 set table.exec.mini-batch.size=1000; -- 开启预聚合 set table.optimizer.agg-phase-strategy=TWO_PHASE;
示例
-- 删除表 drop table words; -- source 表 CREATE TABLE words ( word STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000000', -- 每秒生成的数据行数据 'fields.word.length' = '2' --字段长度限制 );
上游生产数据的速度时50万每秒,下游消费数据的速度10万每秒 ---- 反压
-- 删除表 drop table blackhole_table; -- 黑洞 CREATE TABLE blackhole_table ( word STRING, c BIGINT ) WITH ('connector' = 'blackhole') -- 执行查询 insert into blackhole_table select word,count(1) as c from words group by word
开启minibatch和预聚合
预聚合之后上游发生到数据下游数据量会减少,可以解决反压
flink内部已经欸有发生反压了
set table.exec.mini-batch.enabled=true; set table.exec.mini-batch.allow-latency='5 s'; set table.exec.mini-batch.size=1000; set table.optimizer.agg-phase-strategy=TWO_PHASE;
--mysql sink CREATE TABLE word_count ( word STRING, c BIGINT, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata', 'table-name' = 'word_count', 'username' = 'root', 'password' = '123456' ) insert into word_count select word,count(1) as c from words group by word
drop table word_count; CREATE TABLE word_count ( word STRING, c BIGINT, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata', 'table-name' = 'word_count', 'username' = 'root', 'password' = '123456', 'sink.buffer-flush.max-rows'='1000' ,-- 每批次最大值,会增加延迟 'sink.parallelism' ='3' --提高写入数据并行度,增加成本 ); insert into word_count select word,count(1) as c from words group by word;
-- hbase sink drop table hbase_word_count; CREATE TABLE hbase_word_count ( word STRING, info ROW<c BIGINT>, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'word_count', 'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181', 'sink.parallelism' = '3', -- 写入数据并行度 'sink.buffer-flush.max-rows' = '3000' -- 写入数据批次大小 ); --先再habse中创建表 create 'word_count','info' --- 将数据写入hbase insert into hbase_word_count select word,ROW(c) as info from ( select word,count(1) as c from words group by word ) as a
上游生产数据速度比下游消费数据速度要大,flink就会发生反压,反压会从下游向上游传播,直到sourcetask会降低拉取数据速度,避免flink任务执行报错
flink内部反压
增加flink任务的并行度
增加并行度相当于就是增加资源,成本会增加
-- flink sql SET 'parallelism.default' = '2';
开启MiniBatch和预聚合
开启之后会增加延迟
set table.exec.mini-batch.enabled=true; -- 最大缓存时间 set table.exec.mini-batch.allow-latency='5 s'; -- 批次大小 set table.exec.mini-batch.size=1000; -- 开启预聚合 set table.optimizer.agg-phase-strategy=TWO_PHASE;
将数据保持到外部系统
mysql
-- 每批次最大值,会增加延迟 'sink.buffer-flush.max-rows'='1000' --提高写入数据并行度,增加成本 'sink.parallelism' ='3'
Hbase
-- 每批次最大值,会增加延迟 'sink.buffer-flush.max-rows'='1000' --提高写入数据并行度,增加成本 'sink.parallelism' ='3'