李昂
高级数据研发工程师
Apache Doris & Hudi Contributor
部门成立早期, 为了应对业务的快速增长, 数仓架构采用了最直接的Lambda架构
Lambda整体架构如下
此时的架构存在以下缺陷
基于上述Lambda架构存在的缺陷, 我们希望对其作出改进, 实现以下目的
对比项 \ 选型 | Apache Hudi | Apache Iceberg | Apache Paimon |
---|---|---|---|
增量实时upsert支持&性能 | 好 | 较好 | 好 |
存量离线insert支持&性能 | 好 | 较好 | 好 |
增量消费 | 支持 | 依赖Flink State | 支持 |
社区活跃度 | Fork 2.4K , Star 4.6K | Fork 1.8K , Star 4.9K | Fork 0.6K , Star 1.4K |
Doris-Multi-Catalog支持 | 1.2+ 支持 | 1.2+ 支持 | 2.0+ 支持 |
综合考虑以下几点
我们最终选定使用Apache Hudi作为数据湖底座
实时流 join 是事实数仓的痛点之一, 在我们的场景下, 一条事实数据, 需要与多个维度的数据做关联, 例如一场司法拍卖, 需要关联企业最新名称、董监高、企业性质、上市信息、委托法院、询价评估机构等多个维度;一方面, 公司与董监高是1:N的对应关系, 无法实现一条写入, 多条更新; 另一方面, 企业最新名称的变更, 可能涉及到历史冷数据的更新
方案描述
通过FlinkSQL实现增量数据的计算, 每日因为状态TTL过期或者lookup表变更而没有被命中的数据, 通过凌晨的离线调度进行修复
优点
缺点
方案描述
使用MySQL实现数仓分层, 为每张上游表, 都开发lookup逻辑, Hudi只负责做MySQL表的镜像
优点
缺点
所以最终方案选定为第二种 : **MySQL中间表方案**
, 优化后的整体架构如下
根据上述方案, 我们的数据写入是完全镜像于每个flink job的产出MySQL表, 绝大部分表日更新量在50w~300w, 为了保证写入的稳定性, 我们决定采用MOR表
在选择index的时候, 因为BLOOM随着数据量的上升, 瓶颈出现比较快, 我们的候选方式有FLINK_STATE与BUCKET, 综合考虑以下几点要素
我们最终选择使用BUCKET
同时, 综合参考社区推荐与相关最佳实践的文献,
最终, 我们设置bucket_num为128
为了快速整合到历史已经上线的表, 存量数据的快速导入同样也是必不可少的, 通过官网学习, 我们设计了两种方案
在分别对上述2种方案进行测试后, 我们决定采用bulk_insert的方式, 最大的因素还是大并发的Upsert在第一次写入后, 需要的compaction资源非常大, 需要在第一次compaction后再次调整运行资源, 不便于自动化
综合考虑运维难度与资源分配后, 我们决定采用异步调度的方式, 因为我们读的都是RO表, 所以对Compaction频率和单次Compaction时间都有限制, 目前的方案是Compacion Plan由同步任务生成, Checkpoint Interval为1分钟, 触发策略为15次Commits
整合实时链路与离线链路, 所有产出表均由实时逻辑产出
配合Doris多源Catalog, 完成数据整合, 打破数据孤岛
之前Hive的每日数据由单独离线集群通过凌晨的多路归并完成多版本合并, 目前只需要一个实时集群
在我们测试写入的时候, Checkpoint时间比较长, 而且会有反压产生, 追踪StreamWriteFunction.processElement()方法, 发现数据缓情况如下
为了将flush的压力分摊开, 我们的方案就是减小buffer
ps : 默认write.task.max.size必须大于228M
最终的参数 :
-- index 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '128', -- write 'write.tasks' = '4', 'write.task.max.size' = '512', 'write.batch.size' = '8', 'write.log_block.size' = '64',
当表结构中有TIMESTAMP(0)数据类型时, 在使用bulk_insert写入存量数据后, 对接upsert流并进行compaction时, 会报错
Caused by: java.lang.IllegalArgumentException: INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.
提交issue https://github.com/apache/hudi/issues/9804 与社区沟通
最终发现是TIMESTAMP类型, 目前只对TIMESTAMP(3)与TIMESTAMP(6)进行了parquet文件与avro文件的类型标准化
解决方法是暂时使用TIMESTAMP(3)替代TIMESTAMP(0)
将Hudi表信息同步到Hive原数据时, 遇到报错, 且无法通过修改pom文件依赖解决
java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder
与社区沟通, 发现了相同的问题 https://github.com/apache/hudi/issues/3042
解决方法是修改源码的 packaging/hudi-flink-bundle/pom.xml
, 加入
<relocation> <pattern>org.apache.parquet</pattern> <shadedPattern>${flink.bundle.shade.prefix}org.apache.parquet</shadedPattern> </relocation>
并使用
mvn clean install package -Dflink1.17 -Dscala2.12 -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -T 10
手动install源码, 在程序的pom文件中, 使用自己编译的jar包
当使用FlinkSQL TIMESTAMP(3)数据类型写入Hudi, 并开启Hive Sync的时, 查询Hive中的数据, timestamp类型总是比原值多8小时
原因是Hudi写入数据时, 支持UTC时区, 详情见issue https://github.com/apache/hudi/issues/9424
目前的解决方法是写入数据时, 使用FlinkSQL的 CONVERT_TZ
函数
insert into dwd select id,CAST(CONVERT_TZ(CAST(op_ts AS STRING), 'Asia/Shanghai', 'UTC') AS TIMESTAMP(3)) op_ts from ods;
在TaskManager初始化阶段, 偶尔遇到NPE, 且调用栈如下
java.lang.NullPointerException: null at org.apache.hudi.common.config.HoodieConfig.setDefaults(HoodieConfig.java:123)
通过与社区交流, 发现是ReflectionUtils的CLAZZ_CACHE使用HashMap存在线程安全问题
解决方法是引入社区提供的PR : https://github.com/apache/hudi/pull/9786 通过ConcurrentHashMap解除线程安全问题
对接Pushgateway、Prometheus与Grafana, 通过图形化更直截了当的监控Hudi内部相关服务、进程的内存与CPU占用情况, 做到
目前是采用封装工具类的方式, 让每个开发同学在产出一张结果表的同时, 在同一个job中启动一条Hudi同步链路, 缺少对Hudi同步任务的统一管理与把控, 后续准备对所有Hudi链路迁出, 进行统一的任务整合与元数据管理
后续计划中我们希望在1.0发行版中可以正式将CONSISTENT_HASHING BUCKET投入到线上环境, 现在线上许多3e~5e量级的表都是提前按照10e数据量来预估资源与bucket_num, 有资源浪费的情况, 希望可以通过引入一致性hash的bucket索引, 来解决上述问题