前段时间,我写过一篇关于 Fluss、Kafka 和 Paimon 数据湖的区别与联系 的文章。当时提到了一些 Fluss 和 Paimon 数据湖结合的内容,不过有些地方还不够深入。所以今天,我们专门来聊一聊 Fluss 和 Paimon 数据湖的深度结合。
先简单回顾一下 Fluss 的基本操作。如果你在 Flink 里创建一个 Fluss 表,并加上 'table.datalake.enabled' = 'true'
这个配置,就可以实现数据湖模式。例如,创建表时的配置如下图所示:
接下来,你需要用命令 ./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081
启动 Fluss 的 compact service 服务。有了这个服务支持,当你往 Fluss 表中写入数据时,这些数据会自动同步到你配置的 Paimon 数据湖。
这个流程听起来很简单,但当我第一次自己动手搭建 Fluss 数据入湖流程时,遇到了一些疑问:
如何查询 Paimon 数据湖中的数据?
如何查询 Fluss 和 Paimon 数据的“联合视图”?
如何只查询 Fluss 中的数据?
今天这篇文章就从这三个问题出发,带大家深入了解 Fluss 与 Paimon 数据湖的结合。此外,我还会聊聊 Fluss 提出的流批一体设计理念。在我看来,这个设计是未来实时数据分析领域的重要亮点。
接下来,让我们一步步拆解这些问题!
这个 Fluss 官网写的有,如下图:
-- read from paimon SELECT COUNT(*) FROM orders$lake; -- we can also query the system tables SELECT * FROM orders$lake$snapshots;
大概的意思就是,如果 ordersKaTeX parse error: Expected 'EOF', got '这' at position 6: lake 这̲样查询,查询的就是基础文件,应…lake$snapshots 这个就是查询该表的全部数据(最全的数据)。同时上面这两种查询都完全继承 Paimon 表的所有功能,就像其他查询工具查询 Paimon 表一样的。
如果要利用 Fluss 和 Paimom 来读取全部的数据,按照下面的 sql 执行:
-- query will union data of Fluss and Paimon SELECT SUM(order_count) as total_orders FROM ads_nation_purchase_power;
此类查询可能比仅查询 Paimon 数据的速度稍慢,但它提供了更高的数据新鲜度。由于数据持续写入表中,每次运行查询时可能会得到不同的结果。
通过上述方法,您可以根据需求选择从 Paimon 或 Fluss+Paimon 中读取数据,以平衡数据新鲜度和分析性能。
其实看到这里我心里是有一个疑问的:*** 利用 Fluss 和 Paimom 来读取全部的数据***,这样查询出来的数据不会重复吗?具体看下面这样例子。
你有一张订单表 orders
,在创建表时加了配置 'table.datalake.enabled' = 'true'
。
你往这张表中写入了以下三条数据:
数据写入过程:
Fluss 日志存储:
Offset 1: 订单 ID = 001,金额 = 100,状态 = Pending Offset 2: 订单 ID = 002,金额 = 200,状态 = Confirmed Offset 3: 订单 ID = 003,金额 = 300,状态 = Shipped
Paimon 数据湖:
Fluss 的 Compact Service 将这些数据同步到 Paimon,Paimon 生成了一个快照:
快照 ID = 1: 订单 ID = 001,金额 = 100,状态 = Pending 订单 ID = 002,金额 = 200,状态 = Confirmed 订单 ID = 003,金额 = 300,状态 = Shipped
查询类型:联合读取(Fluss + Paimon)
你运行以下查询,试图读取 Fluss 和 Paimon 的全部数据:
sqlSELECT * FROM orders;
理论上期望的结果:
订单 ID = 001,金额 = 100,状态 = Pending 订单 ID = 002,金额 = 200,状态 = Confirmed 订单 ID = 003,金额 = 300,状态 = Shipped
可能的问题:重复数据 由于 Fluss 和 Paimon 都存储了一份数据,联合读取时可能会出现重复:
订单 ID = 001,金额 = 100,状态 = Pending (Fluss) 订单 ID = 001,金额 = 100,状态 = Pending (Paimon) 订单 ID = 002,金额 = 200,状态 = Confirmed (Fluss) 订单 ID = 002,金额 = 200,状态 = Confirmed (Paimon) 订单 ID = 003,金额 = 300,状态 = Shipped (Fluss) 订单 ID = 003,金额 = 300,状态 = Shipped (Paimon)
但是经过实操查询出来的数据和期望的结果是一样的,如下图:
订单 ID = 001,金额 = 100,状态 = Pending
订单 ID = 002,金额 = 200,状态 = Confirmed
订单 ID = 003,金额 = 300,状态 = Shipped
为了避免联合读取 Fluss 和 Paimon 数据时出现重复,Fluss 设计了一套机制确保 数据一致性 和 去重。以下将通过详细的机制分析和具体的数据例子来说明 为什么需要记录 Offset,以及如何在读取时处理增量数据和主键去重。
假设的数据写入情况:
Fluss 日志(实时流数据):
Offset 1: 订单 ID = 001,金额 = 100,状态 = Pending Offset 2: 订单 ID = 002,金额 = 200,状态 = Confirmed Offset 3: 订单 ID = 003,金额 = 300,状态 = Shipped
Paimon 快照数据(已合并数据):
快照 ID = 1: 订单 ID = 001,金额 = 100,状态 = Pending 订单 ID = 002,金额 = 200,状态 = Confirmed 订单 ID = 003,金额 = 300,状态 = Shipped
快照与 Offset 的映射:
快照 ID = 1 对应 Fluss Offset = 3
增量写入新数据:
新数据写入 Fluss 日志:
Offset 4: 订单 ID = 003,金额 = 300,状态 = Delivered Offset 5: 订单 ID = 004,金额 = 400,状态 = Pending
联合读取过程:
Fluss 读取 Paimon 数据(快照 ID = 1):
订单 ID = 001,金额 = 100,状态 = Pending 订单 ID = 002,金额 = 200,状态 = Confirmed 订单 ID = 003,金额 = 300,状态 = Shipped
Fluss 从日志读取增量数据(Offset > 3):
Offset 4: 订单 ID = 003,金额 = 300,状态 = Delivered Offset 5: 订单 ID = 004,金额 = 400,状态 = Pending
主键冲突的场景:
Paimon 数据:
订单 ID = 001,金额 = 100,状态 = Pending 订单 ID = 002,金额 = 200,状态 = Confirmed 订单 ID = 003,金额 = 300,状态 = Shipped
Fluss 增量日志(Offset > 3):
Offset 4: 订单 ID = 003,金额 = 300,状态 = Delivered Offset 5: 订单 ID = 004,金额 = 400,状态 = Pending
去重后的结果:
Fluss 检测到 订单 ID = 003
在 Paimon 和 Fluss 中都存在。
根据主键去重规则,使用 Fluss 中最新的记录覆盖 Paimon 中的记录:
最终结果: 订单 ID = 001,金额 = 100,状态 = Pending (Paimon) 订单 ID = 002,金额 = 200,状态 = Confirmed (Paimon) 订单 ID = 003,金额 = 300,状态 = Delivered (Fluss) 订单 ID = 004,金额 = 400,状态 = Pending (Fluss)
通过记录 Offset,Fluss 能够实现以下功能:
区分已写入 Paimon 的数据和 Fluss 中的增量数据
准确定位增量读取的起始位置
提高读取性能
Offset 切换点:
主键去重:
最终结果: 联合读取的结果既包含 Paimon 的历史数据,也包含 Fluss 的实时更新,同时保证数据无重复、状态最新。
写到这里,我又有了一个疑问,它这个查询最终查询了 Paimon 和 Fluss 两个系统,那怎么保证事务的呢?这个我们放在下一篇文章来说。还有这篇文章的第三个问题,也会放在下一篇文章继续说清楚的。欢迎大家一起来讨论大数据技术,关注 大圣数据星球 微信公众号带你搞定数据开发不迷路。