C/C++教程

flink-优化(MiniBatch_Local-Global,反压)

本文主要是介绍flink-优化(MiniBatch_Local-Global,反压),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

8、优化

1、MiniBatch 聚合

flink默认是每一条数据都会取更新状态

MiniBatch :缓存一批数据一起更新状态,优点:增加吞吐量,缺点:增加延迟-

  • 开启MiniBatch

    -- sql中开启
    -- 开启
    set table.exec.mini-batch.enabled=true; 
    -- 最大缓存时间
    set table.exec.mini-batch.allow-latency='5 s'; 
    -- 批次大小
    set table.exec.mini-batch.size=1000;
    

2、Local-Global 聚合

开启预聚合需要先开启MiniBatch

set table.exec.mini-batch.enabled=true; 
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s'; 
-- 批次大小
set table.exec.mini-batch.size=1000;
-- 开启预聚合
set table.optimizer.agg-phase-strategy=TWO_PHASE;
  • 示例

    -- 删除表
    drop table words;
    -- source 表
    CREATE TABLE words (
        word STRING
    ) WITH (
    'connector' = 'datagen',
     'rows-per-second' = '1000000', -- 每秒生成的数据行数据
     'fields.word.length' = '2' --字段长度限制
    );
    
    

    上游生产数据的速度时50万每秒,下游消费数据的速度10万每秒 ---- 反压

    -- 删除表
    drop table blackhole_table;
    -- 黑洞
    CREATE TABLE blackhole_table (
    	word STRING,
        c BIGINT
    )
    WITH ('connector' = 'blackhole')
    -- 执行查询
    insert into blackhole_table
    select word,count(1) as c from 
    words
    group by word
    

    开启minibatch和预聚合

    预聚合之后上游发生到数据下游数据量会减少,可以解决反压

    flink内部已经欸有发生反压了

    set table.exec.mini-batch.enabled=true; 
    set table.exec.mini-batch.allow-latency='5 s'; 
    set table.exec.mini-batch.size=1000;
    set table.optimizer.agg-phase-strategy=TWO_PHASE;
    
    • 将数据保存到mysql,写入数据的速度只能达到1600/s - 反压
    --mysql sink
    CREATE TABLE word_count (
    	word STRING,
        c BIGINT,
        PRIMARY KEY (word) NOT ENFORCED 
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://master:3306/bigdata',
       'table-name' = 'word_count',
       'username' = 'root',
       'password' = '123456'
    )
    
    insert into word_count
    select word,count(1) as c from 
    words
    group by word
    
    • 数据写入mysql,增加批次大小,和提高并行度,可以解决反压
    drop table word_count;
    CREATE TABLE word_count (
    	word STRING,
        c BIGINT,
        PRIMARY KEY (word) NOT ENFORCED 
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://master:3306/bigdata',
       'table-name' = 'word_count',
       'username' = 'root',
       'password' = '123456',
       'sink.buffer-flush.max-rows'='1000' ,-- 每批次最大值,会增加延迟
       'sink.parallelism' ='3' --提高写入数据并行度,增加成本
    );
    
    insert into word_count
    select word,count(1) as c from 
    words
    group by word;
    
    • flink将写入hbase
    -- hbase sink
    drop table hbase_word_count;
    CREATE TABLE hbase_word_count (
     word STRING,
     info ROW<c BIGINT>,
     PRIMARY KEY (word) NOT ENFORCED
    ) WITH (
     'connector' = 'hbase-1.4',
     'table-name' = 'word_count',
     'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181',
     'sink.parallelism' = '3', -- 写入数据并行度
      'sink.buffer-flush.max-rows' = '3000'  -- 写入数据批次大小
    );
    
    --先再habse中创建表
    create  'word_count','info'
    
    --- 将数据写入hbase
    insert into hbase_word_count
    select word,ROW(c) as info from (
        select word,count(1) as c from 
        words
        group by word
    ) as a
    

9、反压

上游生产数据速度比下游消费数据速度要大,flink就会发生反压,反压会从下游向上游传播,直到sourcetask会降低拉取数据速度,避免flink任务执行报错

  • flink内部反压

    • 增加flink任务的并行度

      增加并行度相当于就是增加资源,成本会增加

      -- flink sql
      SET 'parallelism.default' = '2';
      
    • 开启MiniBatch和预聚合

      开启之后会增加延迟

      set table.exec.mini-batch.enabled=true; 
      -- 最大缓存时间
      set table.exec.mini-batch.allow-latency='5 s'; 
      -- 批次大小
      set table.exec.mini-batch.size=1000;
      -- 开启预聚合
      set table.optimizer.agg-phase-strategy=TWO_PHASE;
      
  • 将数据保持到外部系统

    • mysql

      -- 每批次最大值,会增加延迟
      'sink.buffer-flush.max-rows'='1000' 
      --提高写入数据并行度,增加成本
      'sink.parallelism' ='3' 
      
  • Hbase

    • flink查询hbase可以开启异步IO,lookup.async=true,只支持hbase2.2以上版本
    -- 每批次最大值,会增加延迟
    'sink.buffer-flush.max-rows'='1000' 
    --提高写入数据并行度,增加成本
    'sink.parallelism' ='3' 
    
这篇关于flink-优化(MiniBatch_Local-Global,反压)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!