C/C++教程

两个ACID平台的故事

本文主要是介绍两个ACID平台的故事,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Apache HUDI vs Delta Lake

像基于系统的Data Lake一样,最近对Hadoop的ACID合规性也获得了很大的吸引力,Databricks Delta Lake和Uber的Hudi成为了主要的贡献者和竞争对手。 两者都通过在"Parquet"文件格式中提供不同的抽象风格解决了一个主要问题; 很难选择一个比另一个更好的选择。 在此博客中,我们将使用一个非常基本的示例来了解这些工具的工作原理。 我们将让读者将其功能视为利弊。

我们将采用相反的方法,如本系列下一篇文章中的内容,我们将讨论诸如Data Lake之类的Hadoop的重要性,以及为什么首先会出现对诸如Delta Lake/ Hudi之类的系统的需求,以及数据工程师过去如何孤立地进行构建 Lakes的易错ACID系统。

环境设置源数据库:AWS RDS MySQL

CDC工具:AWS DMS

Hudi设置:AWS EMR 5.29.0

Delta Lake设置:Databricks运行时6.1

对象/文件存储:AWS S3

通过选择和根据基础结构可用性; 上面的工具集被考虑用于演示; 也可以使用以下替代方法

源数据库:任何基于传统/基于云的RDBMS

CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定义Binlog Parser

Hudi

设置:开源/企业Hadoop上的Apache Hudi

设置:开源/企业Hadoop对象/文件上的

Delta Lake:ADLS / HDFS

数据准备步骤:

create database demo;

use demo;

create table hudi_delta_test(pk_id integer,

name varchar(255),

value integer,

zupdated_at timestamp default now() on update now(),created_at timestamp default now(),constraint pk primary key(pk_id));insert into hudi_delta_test(pk_id,name,value) values(1,‘apple’,10);insert into hudi_delta_test(pk_id,name,value) values(2,‘samsung’,20);insert into hudi_delta_test(pk_id,name,value) values(3,‘dell’,30);insert into hudi_delta_test(pk_id,name,value) values(4,‘motorola’,40);

Apache HUDI vs Delta Lake

> 1. Here’s the state of MySQL table

现在,我们使用DMS将数据加载到S3中的某个位置,并使用文件夹名称full_load标识该位置。 为了坚持标题; 我们将跳过DMS的设置和配置。 这是满载后S3的屏幕截图。

Apache HUDI vs Delta Lake

> 2. Full Loaded Data in S3

现在,让我们在MySQL表中执行一些插入/更新/删除操作。

insert into hudi_delta_test(pk_id,name,value) values(4,‘motorola’,40);

update hudi_delta_test set value=201 where pk_id=2;

delete from hudi_delta_test where pk_id=3;

Apache HUDI vs Delta Lake

> 3. MySQL table after I/U/D operations

让我们再次跳过DMS魔术,将CDC数据按以下方式加载到S3。

Apache HUDI vs Delta Lake

> 4. CDC Data in S3

注意:DMS会填充一个名为" Op"的附加字段,表示"操作",并分别具有用于插入,更新和删除记录的值I / U / D。 以下屏幕截图仅显示了CDC数据的内容。 屏幕截图来自Databricks笔记本,仅为了方便起见,而不是强制性的。

df=spark.read.parquet(‘s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test’)

df()

Apache HUDI vs Delta Lake

> 5. I/U/D flags in CDC Data

现在让我们从真实的游戏开始; 当DMS继续将CDC事件传送到S3时(对于Hudi和Delta Lake),此S3成为数据源,而不是MySQL。 作为这两种工具的最终状态,我们旨在获得一致的统一视图,如上面[1]所示。

使用Apache HUDI

HUDI以两种方式处理UPSERTS [1]:

· 写入时复制(CoW):数据以列格式(Parquet)存储,并且在写入过程中更新会创建文件的新版本。 此存储类型最适合于读取繁重的工作负载,因为数据集的最新版本始终在有效的列式文件中可用。

· 读取时合并(MoR):数据以列(Parquet)格式和基于行(Avro)格式的组合存储; 更新记录到基于行的"增量文件"中,并在以后进行压缩以创建新版本的列式文件。 此存储类型最适合于繁重的写工作负载,因为新提交会以增量文件的形式快速写入,但是读取数据集需要将压缩的列式文件与增量文件合并。

使用以下配置打开Spark Shell并导入相关库

spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

import org.apache.spark.sql.SaveMode

import org.apache.spark.sqlctions._

import org.apache.hudi.DataSourceWriteOptions

import org.apache.hudi.config.HoodieWriteConfig

import org.apache.hudi.hive.MultiPartKeysValueExtractor

使用CoW:

val inputDataPath="s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"val hudiTableName="hudi_cow"val hudiTablePath="s3://development-dl/demo/hudi-delta-demo/hudi_cow"val hudiOptions=Map[String,String] ( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”, DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”, DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName )val temp=spark.read.format(“parquet”).load(inputDataPath)val fullDF=temp.withColumn(“Op”,lit(‘I’))fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

由于我们在Hudi选项中使用了Hive自动同步交易,因此将在Hive中创建一个名为" hudi_cow"的表。 该表是使用具有Hoodie格式的Parquet SerDe创建的。

Apache HUDI vs Delta Lake

> Hive CoW Table DDL

这是表的内容:

Apache HUDI vs Delta Lake

> Hive CoW Table (After Full Load)

val updateDF=spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

相同的配置单元表" hudi_cow"将填充最新的UPSERTED数据,如下面的屏幕截图所示

Apache HUDI vs Delta Lake

> Hive CoW Table (After Delta Merge)

如CoW定义中所述,当我们以hudi格式将updateDF写入相同的S3位置时,Upserted数据在写入时被复制,并且快照和增量数据仅使用一个表。

使用MoR:

val inputDataPath="s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"val hudiTableName="hudi_mor"val hudiTablePath="s3://development-dl/demo/hudi-delta-demo/hudi_mor"val hudiOptions=Map[String,String] ( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”, DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”, DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName )val temp=spark.read.format(“parquet”).load(inputDataPath)val fullDF=temp.withColumn(“Op”,lit(‘I’))fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

将在Hive中创建两个名为" hudi_mor"和" hudi_mor_rt"的表。 hudi_mor是经过读取优化的表,将具有快照数据,而hudi_mor_rt将具有增量和实时合并数据。 数据被压缩,并以频繁的压缩间隔提供给hudi_mor。 hudi_mor_rt利用Avro格式存储增量数据。 就像定义中所说的MoR一样,通过hudi_mor_rt读取的数据将即时合并。 这对于高可更新源表很有用,同时提供了一致且不是最新读取的优化表。

注意:" hudi_mor"和" hudi_mor_rt"都指向相同的S3存储桶,但定义为不同的存储格式。

Apache HUDI vs Delta Lake

> DDL For MoR Read Optimized (Snapshot) Table

Apache HUDI vs Delta Lake

> DDL For Real-Time/Incremental MoR Table.

满载后,两个表的内容相同,如下所示:

Apache HUDI vs Delta Lake

> Hive MoR Tables after Full Load

val updateDF=spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

表hudi_mor在很短时间内就具有相同的旧内容(因为演示中的数据很小,并且很快会被压缩),但是只要合并命令成功存在,表hudi_mor_rt就会填充最新数据。

Apache HUDI vs Delta Lake

> Hive MoR table (Real-Time) after Merge

现在,让我们看一下这些Hudi格式表在S3日志中发生的情况。 在底层存储格式保持拼凑的同时,ACID通过日志方式进行管理。 通常生成以下类型的文件:

hoodie_partition_metadata:这是一个小文件,包含有关给定分区中partitionDepth和最后一次commitTime的信息

hoodieperties:表名称,类型存储在此处。

commit and clean:文件统计信息和有关正在写入的新文件的信息,以及诸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其他相关审核字段之类的信息,存储在这些文件中。 这些文件是为每次提交生成的。

以上3个文件对于CoW和MoR类型的表都是通用的。 但是,对于MoR表,存在为UPSERTED分区创建的avro格式的日志文件。 以下屏幕快照中的第一个文件是CoW表中不存在的日志文件。

Apache HUDI vs Delta Lake使用三角洲湖:

使用下面的代码片段,我们以镶木地板格式读取满载数据,并以增量格式将其写入不同的位置

from pyspark.sqlctions import *inputDataPath="s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"deltaTablePath="s3://development-dl/demo/hudi-delta-demo/delta_table"fullDF=spark.read.format(“parquet”).load(inputDataPath)fullDF=fullDF.withColumn(“Op”,lit(‘I’))fullDF.write.format(“delta”).mode(“overwrite”).save(deltaTablePath)

Apache HUDI vs Delta Lake

> Steps to load the data in delta format

在Databricks笔记本的SQL界面中使用以下命令,我们可以创建一个Hive外部表," using delta"关键字包含基础SERDE和FILE格式的定义,无需特别提及。

%sqlcreate table delta_table using deltalocation ‘s3://development-dl/demo/hudi-delta-demo/delta_table’

Apache HUDI vs Delta Lake

> Create Table statement using Delta Format

该表的DDL如下所示。

%sqlshow create table delta_table

Apache HUDI vs Delta Lake

> Hive Delta formatted Table DDL

预期的表包含与完整加载文件中相同的所有记录。

%sqlselect * from delta_table

Apache HUDI vs Delta Lake

> Hive Table content after full load

使用以下命令读取CDC数据并在Hive中注册为临时视图

updateDF=spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)updateDF.createOrReplaceTempView(“temp”)

Apache HUDI vs Delta Lake

> Reading CDC Data

MERGE COMMAND:下面是执行UPSERT MAGIC的MERGE SQL,为方便起见,它已作为SQL单元执行,也可以在spark.sql()方法调用中很好地执行

%sqlMERGE INTO delta_table targetUSING (SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at FROM temp latest_changes INNER JOIN ( SELECT pk_id, max(updated_at) AS MaxDate FROM temp GROUP BY pk_id) cm ON latest_changes.pk_id=cm.pk_id AND latest_changes.updated_at=cm.MaxDate) as sourceON source.pk_id==target.pk_idWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *

Apache HUDI vs Delta Lake

MERGE之后,Hive中的delta_table的内容。 更新了!!!!

%sqlselect * from delta_table

Apache HUDI vs Delta Lake

与Hudi一样,在Delta Lake的情况下,底层文件存储格式也是"镶木地板"。 Delta提供带有日志和版本控制的ACID功能。 让我们看看S3在满载和CDC合并后发生了什么。

Apache HUDI vs Delta Lake

> S3 Location containing 1 parquet file after full load

增量日志包含JSON格式的日志,其中包含有关每次提交后的架构和最新文件的信息。

Apache HUDI vs Delta Lake

> Delta Log after Full Load

对于CDC合并,由于可以插入/更新或删除多个记录。 初始拼花地板文件的内容分为多个较小的拼花地板文件,这些较小的文件被重写。 如果对表进行了分区,则仅与更新的分区相对应的CDC数据将受到影响。 初始拼花地板文件仍存在于该文件夹中,但已从新日志文件中删除。 如果我们在此表上运行VACUUM,则可以物理删除该文件。 这些较小的文件也可以通过使用OPTIMIZE命令[6]来串联。

Apache HUDI vs Delta Lake

> S3 Location after CDC Merge

Delta Log附加了另一个JSON格式的日志文件,该文件存储架构和指向最新文件的文件指针。

Apache HUDI vs Delta Lake

> Delta Log after CDC Merge

在两个示例中,我都按原样保留了删除的记录,并且可以通过Op='D’进行标识,这是故意进行的以显示DMS的功能,但是,下面的参考资料显示了如何将此软删除转换为 轻松删除。

希望这是一个有用的比较,并将有助于做出明智的决定,以选择我们数据湖中的任何可用工具集。 我更偏向Delta,因为Hudi目前不支持PySpark。

关于作者:

Vibhor Goyal是Punchh的数据工程师,他正在努力构建Data Lake及其应用程序,以满足多种产品和分析需求。

这篇关于两个ACID平台的故事的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!