预聚合是 OLAP 系统中常用的一种优化手段,在通过在加载数据时就进行部分聚合计算,生成聚合后的中间表或视图,从而在查询时直接使用这些预先计算好的聚合结果,提高查询性能,实现这种预聚合方法大多都使用物化视图来实现。
Clickhouse 社区实现的 Projection 功能类似于物化视图,原始的概念来源于 Vertica,在原始表数据加载时,根据聚合 SQL 定义的表达式,计算写入数据的聚合数据与原始数据同步写入存储。在数据查询的过程中,如果查询 SQL 通过匹配分析可以通过聚合数据计算得到,直接查询聚合数据减少计算开销,大幅提升查询性能。
Clickhouse Projection 是针对物化视图现有问题,在查询匹配,数据一致性上扩展了使用场景:
ByteHouse 是火山引擎基于 ClickHouse 研发的一款分析型数据库产品,是同时支持实时和离线导入的自助数据分析平台,能够对 PB 级海量数据进行高效分析。具备真实时分析、存储-计算分离、多级资源隔离、云上全托管服务四大特点,为了更好的兼容社区的 projection 功能,扩展 projection 使用场景,ByteHouse 对 Projection 进行了匹配场景和架构上进行了优化。在 ByteHouse 商业客户性能测试 projection 的性能测试,在 1.2 亿条的实际生产数据集中进行测试,查询并发能力提升 10~20 倍,下面从 projeciton 在优化器查询改写和基于 ByteHouse 框架改进两个方面谈一谈目前的优化工作。
为了提高 ByteHouse 对社区有很好的兼容性,ByteHouse 保留了原有语法的支持,projection 操作分为创建,删除,物化,删除数据几个操作。为了便于理解后面的优化使用行为分析系统例子作为分析的对象。
-- 新增projection定义 ALTER TABLE [db].table ADD PROJECTION name ( SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY] ) -- 删除projection定义并且删除projection数据 ALTER TABLE [db].table DROP PROJECTION name -- 物化原表的某个partition数据 ALTER TABLE [db.]table MATERIALIZE PROJECTION name IN PARTITION partition_name -- 删除projection数据但不删除projection定义 ALTER TABLE [db.]table CLEAR PROJECTION name IN PARTITION partition_name
CREATE DATABASE IF NOT EXISTS tea_data; 创建原始数据表 CREATE TABLE tea_data.events( app_id UInt32, user_id UInt64, event_type UInt64, cost UInt64, action_duration UInt64, display_time UInt64, event_date Date ) ENGINE = CnchMergeTree PARTITION BY toDate(event_date) ORDER BY (app_id, user_id, event_type); 创建projection前写入2023-05-28分区测试数据 INSERT INTO tea_data.events SELECT number / 100, number % 10, number % 3357, number % 166, number % 5, number % 40, '2023-05-28 05:11:55' FROM system.numbers LIMIT 100000; 创建聚合projection ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_1 ( SELECT app_id, user_id, event_date, sum(action_duration) GROUP BY app_id, user_id, event_date ); 创建projection后写入2023-05-29分区测试数据 INSERT INTO tea_data.events SELECT number / 100, number % 10, number % 3357, number % 166, number % 5, number % 40, '2023-05-29 05:11:55' FROM system.numbers LIMIT 100000; Note:CnchMergeTree是Bytehouse特有的引擎
ByteHouse 优化器为业界目前唯一的 ClickHouse 优化器方案。ByteHouse 优化器的能力简单总结如下:
借助 bytehouse 优化器强大的能力,针对 projection 原有实现的几点局限性做了优化,下面我们先来看一下社区在 projection 改写的具体实现。
改写实现在非优化器执行模式下,对原始表的聚合查询可通过 aggregate projection 加速,即读取 projection 中的预聚合数据而不是原始数据。计算支持了 normal partition 和 projection partition 的混合查询,如果一个 partition 的 projection 还没物化,可以使用原始数据进行计算。
具体改写执行逻辑:
优化器会将查询切分为不同的 plan segment 分发到 worker 节点并行执行,segment 之间通过 exchange 交换数据,在 plan segment 内部根据 query plan 构建 pipeline 执行,以下面简单聚合查询为例,说明优化器如何匹配 projection。
Q1: SELECT app_id, user_id, sum(action_duration) FROM tea_data.eventsWHERE event_date = '2023-05-29' GROUP BY app_id, user_id
在执行计划阶段优化器尽量的将 TableScan 上层的 Partial Aggregation Step,Projection 和 Filter 下推到 TableScan 中,在将 plan segment 发送到 worker 节点后,在根据查询代价选择合适 projection 进行匹配改写,从下面的执行计划上看,命中 projection 会在 table scan 中直接读取 AggregateFunction(sum, UInt64)的 state 数据,相比于没有命中 projection 的执行计划减少了 AggregaingNode 的聚合运算。
Projection 在创建之后不支持更新 schema,只能创建新的 projection,但是在一些对于 projection schema 变更需求频繁业务场景下,需要同一个查询既能够读取旧 projection 也能读取新 projection,所以在匹配时需要从 partition 维度进行匹配而不是从 projection 定义的维度进行匹配,混合读取不同 projection 的数据,这样会使查询更加灵活,更好的适应业务场景,下面举个具体的实例:
创建新的projection ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_2 ( SELECT app_id, sum(action_duration), sum(cost) GROUP BY app_id ); 写入2023-05-30的数据 INSERT INTO tea_data.events SELECT number / 10, number % 100, number % 23, number % 3434, number % 23, number % 55, '2023-05-30 04:12:43' FROM system.numbers LIMIT 100000; 执行查询 Q2: SELECT app_id, sum(action_duration) FROM tea_data.events WHERE event_date >= '2023-05-28' GROUP BY app_id
当对原始表添加新字段(维度或指标 ),对应 projection 不包含这些字段,这时候为了利用 projection 一般情况下需要删除 projection 重新做物化,比较浪费资源,如果优化器匹配算法能正确处理不存在缺省字段,并使用缺省值参与计算就可以解决这个问题。
ALTER TABLE tea_data.events ADD COLUMN device_id String after event_type; ALTER TABLE tea_data.events ADD COLUMN stay_time UInt64 after device_id; 执行查询 Q3: SELECT app_id, device_id, sum(action_duration), max(stay_time) FROM tea_data.events WHERE event_date >= '2023-05-28' GROUP BY app_id,device_id
Projection 是按照 Bytehouse 的存算分离架构进行设计的,Projecton 数据由分布式存储统一进行管理,而针对 projection 的查询和计算则在无状态的计算节点上进行。相比于社区版,Bytehouse Projection 实现了以下优势:
在 Bytehouse 中,多个 projections 数据与 data 数据存储在一个共享存储文件中。文件的外部数据对 projections 内部的内容没有感知,相当于一个黑盒。当需要读取某个 projection 时,通过 checksums 里面存储的 projection 指针,定位到特定 projection 位置,完成 projection 数据解析与加载。
Projection 写入分为两部分,先在本地做数据写入,产生 part 文件存储在 worker 节点本地,然后通过 dumpAndCommitCnchParts 将数据 dump 到远程共享存储。
随着时间的推移,针对同一个 partition 会存在越来越多的 parts,而 parts 越多查询过滤时的代价就会越大。因此,Bytehouse 在后台进程中会 merge 同一个 partition 的 parts 组成更大的 part,从而减少 part 的数量提高查询的效率。
Bytehouse 采用 MVCC 的方式,针对 mutate 涉及的列,新增一个 delta part 版本存储此次 mutate 涉及到的列。相应地,我们在 mutate 的时候,构造 projection 的 mutate 操作的 inputstream,将 mutate 后的 projection 和原始表数据一起写到同一个 delta part 中。
如下图所示,根据 Bytehouse 的 part 管理方式,针对 mutate 操作或新增物化操作,我们为 part 生成新的 delta part,在下图 part 中,它所管理的三个 projections 由 base part 中的 proj2,delta part#1 中的 proj1',以及 delta part#2 中的 proj3 共同构成。当 parts 加载完成后,delta part#2 会存储 base part 中的 proj2 的指针和 delta part#1 中的 proj1'指针,以及自身的 proj3 指针,对上层提供统一的访问服务。
目前,CNCH 中针对不同数据设计了不同的缓存类型
另外,为了加快 Projection 数据的加载过程,我们新增了 MetaInfoDiskCacheSegment 用于缓存 Projection 相关的元数据信息。
某真实用户场景的数据集,我们利用它对 Projection 性能进行了测试。
该数据集约 1.2 亿条,包含 projection 约 240G 大小,测试机器 80CPU(s) / 376G Mem,配置如下:
开启 Projection 后,针对 1.2 亿条的数据集,查询性能提升 10~20 倍。
CREATE TABLE user.trades( `type` UInt8, `status` UInt64, `block_hash` String, `sequence_number` UInt64, `block_timestamp` DateTime, `transaction_hash` String, `transaction_index` UInt32, `from_address` String, `to_address` String, `value` String, `input` String, `nonce` UInt64, `contract_address` String, `gas` UInt64, `gas_price` UInt64, `gas_used` UInt64, `effective_gas_price` UInt64, `cumulative_gas_used` UInt64, `max_fee_per_gas` UInt64, `max_priority_fee_per_gas` UInt64, `r` String, `s` String, `v` UInt64, `logs_count` UInt32, PROJECTION tx_from_address_hit ( SELECT * ORDER BY from_address ), PROJECTION tx_to_address_hit ( SELECT * ORDER BY to_address ), PROJECTION tx_sequence_number_hit ( SELECT * ORDER BY sequence_number ), PROJECTION tx_transaction_hash_hit ( SELECT * ORDER BY transaction_hash ) ) ENGINE=CnchMergeTree() PRIMARY KEY (transaction_hash, from_address, to_address) ORDER BY (transaction_hash, from_address, to_address) PARTITION BY toDate(toStartOfMonth(`block_timestamp`));
Q1
WITH tx AS ( SELECT * FROM user.trades WHERE from_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC UNION ALL SELECT * FROM user.trades WHERE to_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC, transaction_index DESC ) SELECT * FROM tx LIMIT 100;
Q2
with tx as (select sequence_number, transaction_index, transaction_hash, input from user.trades where from_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc UNION ALL select sequence_number, transaction_index, transaction_hash, input from user.trades where to_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc, transaction_index desc) select sequence_number, transaction_hash, substring(input,1,8) as func_sign from tx order by sequence_number desc, transaction_index desc limit 100 settings max_threads = 1, allow_experimental_projection_optimization = 1, use_uncompressed_cache = true;
Q1
Q2