我是一位 Snowflake 的首席解决方案架构师一职 ,拥有超过 17 年的数据策略、架构和工程经验。在这里提到的观点仅代表我个人,并不一定反映我当前、过往或未来的雇主的观点。
在之前的帖子中,Robert Thompson 比较了在 Snowflake 和 Databricks 上运行查询。这是诚心比较两个平台的一次尝试,但大多数“基准测试”中,情况可能并非表面所见。
Databricks 与 Snowflake 的数据对比By Robert Thompsonmedium.com这篇文章发表于2024年11月,并提到了以下内容:
因为它还没有进入GA阶段,也不适合用于生产环境,因此所有这些测试都没有使用Iceberg。
与其测试Iceberg的GA功能,基准测试使用的是Parquet格式的外部表,这不利用元数据统计信息来取得像Snowflake那样的性能优势,许多人对此已经习惯了。此外,最近的一篇LinkedIn帖子表明,Iceberg表的性能比外部Parquet表高25倍。Iceberg表在开放源数据格式上表现出色的性能,无需借助Snowflake计算资源,就可以用外部引擎进行查询。
为什么不继续罗伯特的工作,看看所谓的Snowflake的Managed Iceberg表与最初公布的结果有何不同?
设置部分我认为给那些想自己尝试的人提供可重复的步骤很重要。罗伯特给了我他用过的数据链接: https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=azureml-opendatasets。我最初加载了原始数据,但发现列名和数据格式随着时间变化。相反,我启动了一个Databricks集群,并使用笔记本加载了整个数据范围,使用了Azure开放数据集Python库,然后导出了到我的blob存储中。
# 这是一个预览版的包。 # 需要在 Databricks 集群中通过 pip 安装 azureml-opendatasets。 https://learn.microsoft.com/azure/data-explorer/connect-from-databricks#install-the-python-library-on-your-azure-databricks-cluster !pip install azureml-opendatasets from azureml.opendatasets import NycTlcYellow from datetime import datetime from dateutil import parser ## 获取完整的时间范围 start_date = parser.parse('2009-01-01') end_date = parser.parse('2018-12-31') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_df = nyc_tlc.to_spark_dataframe() # 验证是否返回了数据 display(nyc_tlc_df.limit(5)) #### ## 请根据您的 Blob 存储帐户进行相应的修改 #### storage_account_name = "<your_storage_account>" storage_account_access_key = "<your_access_key>" container_name = "<your_container>" # 使用访问密钥来配置 Spark 会话 spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_access_key) # 存放 Parquet 文件的路径 output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/yellow" # 将 DataFrame 写入 Parquet nyc_tlc_df.write.mode("overwrite").parquet(output_path)
当数据被导出到Azure后,我就能够连接Azure的Snowflake账户(你也可以使用AWS或GCP中的账户)。使用存储集成功能连接了Azure存储账户之后,我为Iceberg用例创建了一个外部表。从那里,我将Azure Blob存储中的Parquet数据加载到了Snowflake Iceberg表结构中。
--创建一个新的存储集成以连接到blobstore USE ROLE ACCOUNTADMIN; CREATE STORAGE INTEGRATION azure_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'AZURE' ENABLED = TRUE AZURE_TENANT_ID = '<your Azure Tenant>' STORAGE_ALLOWED_LOCATIONS = ('azure://<storage_account>.blob.core.windows.net/<container>/'); GRANT USAGE ON INTEGRATION azure_int to role sysadmin; --我们还需要一个外部卷来写入Iceberg CREATE OR REPLACE EXTERNAL VOLUME AZURE_EXT_VOLUME STORAGE_LOCATIONS = ( ( NAME = 'azure-iceberg-volume' STORAGE_PROVIDER = 'AZURE' STORAGE_BASE_URL = 'azure://<storage_account>.blob.core.windows.net/<container>/' AZURE_TENANT_ID = '<your Azure Tenant>' ) ); GRANT USAGE ON EXTERNAL VOLUME AZURE_EXT_VOLUME TO ROLE SYSADMIN; USE ROLE SYSADMIN; --创建一个新的数据库、模式和阶段 CREATE DATABASE SNOW_DB; CREATE SCHEMA SNOW_SCHEMA; CREATE STAGE taxi_data_stage URL='azure://<storage_account>.blob.core.windows.net/<container>/yellow' STORAGE_INTEGRATION = azure_int; --创建加载Parquet文件的文件格式 CREATE OR REPLACE FILE FORMAT parquet_file_format TYPE = 'parquet' USE_VECTORIZED_SCANNER = TRUE; --创建一个Iceberg表 create or replace ICEBERG TABLE NYCTLCYELLOW_IB ( "vendorID" VARCHAR(16777216), "tpepPickupDateTime" TIMESTAMP_NTZ(6), "tpepDropoffDateTime" TIMESTAMP_NTZ(6), "passengerCount" NUMBER(38,0), "tripDistance" FLOAT, "puMonth" NUMBER(38,0), "tipAmount" FLOAT, "startLon" FLOAT, "paymentType" VARCHAR(16777216), "endLat" FLOAT, "startLat" FLOAT, "fareAmount" FLOAT, "mtaTax" FLOAT, "improvementSurcharge" VARCHAR(16777216), "endLon" FLOAT, "tollsAmount" FLOAT, "totalAmount" FLOAT, "puLocationId" VARCHAR(16777216), "rateCodeId" NUMBER(38,0), "storeAndFwdFlag" VARCHAR(16777216), "doLocationId" VARCHAR(16777216), "extra" FLOAT, "puYear" NUMBER(38,0) ) EXTERNAL_VOLUME = 'AZURE_EXT_VOLUME' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'NYCTLCYELLOW_IB/'; --让我们将数据加载到NYCTLCYELLOW_IB COPY INTO NYCTLCYELLOW_IB FROM @taxi_data_stage FILE_FORMAT = (format_name = PARQUET_FILE_FORMAT) MATCH_BY_COLUMN_NAME = 'CASE_SENSITIVE' PATTERN='.*[.]parquet'; --如果您希望优化性能 --让我们根据tpepPickupDateTime重新排序数据 INSERT OVERWRITE INTO NYCTLCYELLOW_IB SELECT * FROM NYCTLCYELLOW_IB ORDER BY "tpepPickupDateTime";查询执行情况
我没有记录加载结果的情况,而是关注了查询执行的时间。对于其中两个查询,我重写了它们以利用Snowflake的性能优势,这并没有影响数据的输出。虽然选择查询本身没有改变,但在插入数据之后,我使用了INSERT OVERWRITE INTO ORDER BY子句(如上代码所示),这自然地对表进行了聚类操作。这是一步可选的操作,但也是良好的实践,可以防止数据出现倾斜,并有助于Snowflake优化器高效地进行数据修剪。
提供了完整的设置和运行代码,这样其他人可以轻松复现结果。以下是修改后的两个查询。
一般的查询:
--原查询 SELECT "puYear", "puMonth", "totalAmount" FROM ( SELECT "puYear", "puMonth", "totalAmount", ROW_NUMBER() OVER (partition by "puYear", "puMonth" order by "totalAmount") as rn FROM nyctlcyellow_ib ) ranked WHERE ranked.rn = 1; --修改后的查询1. 不需要外层的选择。我使用了QUALIFY(筛选条件) SELECT "puYear", "puMonth", "totalAmount", ROW_NUMBER() OVER (partition by "puYear", "puMonth" order by "totalAmount") as rn FROM nyctlcyellow_ib QUALIFY rn=1; --修改后的查询2. 不需要进行排名,直接取最小值! SELECT "puYear", "puMonth", MIN("totalAmount") FROM nyctlcyellow_ib GROUP BY "puYear", "puMonth" ORDER BY 1,2;
荒谬的提问:
--原始查询在Snowflake上的执行情况。 --注意,Snowflake上的查询时间大约是之前所说的50%。 SELECT count(*) FROM ( SELECT * FROM NYCTLCYELLOW_IB GROUP BY * ) a ; --修改后的查询2。所有行的哈希值唯一计数。 SELECT COUNT (DISTINCT HASH(*)) FROM NYCTLCYELLOW_IB;
以下是一个代码片段,详细说明了我如何执行这些查询并得到结果。这是一个简单的查询示例工作表。每个查询都运行了三次,并关闭了缓存以避免结果被扭曲。
--使用 xs WH USE WAREHOUSE COMPUTE_XS_WH; --关闭仓库并禁用结果缓存 ALTER SESSION SET USE_CACHED_RESULT = FALSE; ALTER WAREHOUSE COMPUTE_XS_WH SUSPEND; --设置查询标签以从 QH 获取我们的结果 ALTER SESSION SET QUERY_TAG = 'Simple:XS'; --运行查询 SELECT * FROM nyctlcyellow_ib ORDER BY "tpepPickupDateTime" DESC LIMIT 1000; --暂停仓库以避免使用 WH 缓存。等待 2 秒... ALTER WAREHOUSE COMPUTE_XS_WH SUSPEND; SELECT SYSTEM$WAIT(2); --运行查询 SELECT * FROM nyctlcyellow_ib ORDER BY "tpepPickupDateTime" DESC LIMIT 1000; --暂停仓库以避免使用 WH 缓存。等待 2 秒... ALTER WAREHOUSE COMPUTE_XS_WH SUSPEND; SELECT SYSTEM$WAIT(2); --运行查询 SELECT * FROM nyctlcyellow_ib ORDER BY "tpepPickupDateTime" DESC LIMIT 1000; --取消设置查询标签 ALTER SESSION UNSET QUERY_TAG; --使用 Medium 仓库 USE WAREHOUSE COMPUTE_M_WH; --设置查询标签以从 QH 获取我们的结果 ALTER SESSION SET QUERY_TAG = 'Simple:M'; --运行简单查询 SELECT * FROM nyctlcyellow_ib ORDER BY "tpepPickupDateTime" DESC LIMIT 1000; --暂停仓库以避免使用 WH 缓存。等待 2 秒... ALTER WAREHOUSE COMPUTE_M_WH SUSPEND; SELECT SYSTEM$WAIT(2); --运行查询 SELECT * FROM nyctlcyellow_ib ORDER BY "tpepPickupDateTime" DESC LIMIT 1000; --暂停仓库以避免使用 WH 缓存。等待 2 秒... ALTER WAREHOUSE COMPUTE_M_WH SUSPEND; SELECT SYSTEM$WAIT(2); --运行查询 SELECT * FROM nyctlcyellow_ib ORDER BY "tpepPickupDateTime" DESC LIMIT 1000; --取消设置查询标签 ALTER SESSION UNSET QUERY_TAG; --完成运行后,您可以通过程序化方式获取查询时间和成本的结果 --根据之前的博客文章,我使用了 $3 的信用成本 select QUERY_TAG, AVG(TOTAL_ELAPSED_TIME / 1000) TIME_IN_SEC, COUNT(*), CASE WHEN ANY_VALUE(WAREHOUSE_SIZE) = 'X-Small' then 0.0003 WHEN ANY_VALUE(WAREHOUSE_SIZE) = 'Medium' THEN 0.0011 END * 3 CREDIT_COST, TIME_IN_SEC * CREDIT_COST QUERY_COST from SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY WHERE QUERY_TAG LIKE '%:%' AND QUERY_TYPE = 'SELECT' AND QUERY_TEXT <> 'SELECT SYSTEM$WAIT(2);' AND EXECUTION_STATUS = 'SUCCESS' AND WAREHOUSE_SIZE IS NOT NULL AND START_TIME > CURRENT_DATE - 2 GROUP BY QUERY_TAG ORDER BY 1;成果
这些数字看起来大不一样!简单的查询和选择性查询没有被改动,在Snowflake上使用Iceberg后,这些查询的速度快了大约15倍。
中等和荒诞的查询与未修改的 Databricks 大致相同。此外,对于荒诞的查询,我使用 Iceberg 获得的结果比之前少了大约一半,而且没有任何改动。修改后的查询则便宜了惊人的 3 到 4 倍!
这是结果如下。
最后尽管前一篇文章没有真正理解如何优化 Snowflake 就试图诚实地比较这些引擎。经过一些小的调整,一些结果减少了 80%。在优化查询的情况下,使用 Iceberg 管理表的 Snowflake 不仅速度更快,而且成本更低,比 Databricks 更有优势。
我的几点主要收获如下: