上一篇文章《基于 Ubuntu 玩转 Hudi Docker Demo (2)—— 测试数据写入 Kafka》介绍了如何将测试数据写入到 kafka 集群。
本文介绍如何使用 Spark 消费 Kafka 数据,并将数据写入 HDFS。 其中 Hudi 以 Jar 包的方式引入到 Spark。
表类型 | 支持的查询类型 |
---|---|
Copy On Write (写时复制,简称 cow) | 支持快照查询和增量查询 |
Merge On Read (写时复制,简称 mor) | 支持快照查询、增量查询、读优化查询 |
权衡 | CopyOnWrite | MergeOnRead |
---|---|---|
数据延迟 | 高 | 低 |
查询延迟 | 低 | 高 |
更新成本 | 高 ,需要重写整个 parquet 文件 | 低,append 方式写 增量文件 |
写放大 | 大 | 小,取决于合并策略 |
对于 MergeOnRead 表选择查询类型需做以下权衡:
权衡 | Snapshot Queries | Read Optimized Queries |
---|---|---|
数据延迟 | 低 | 高 |
查询延迟 | 高 | 低 |
sudo docker exec -it adhoc-2 /bin/bash
执行以下spark-submit 命令以启动delta-streamer,从 kafka 集群消费数据,采用 COPY_ON_WRITE 模式写入到HDFS,表名 stock_ticks_cow
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_cow \ --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
执行以下spark-submit 命令以启动delta-streamer,从 kafka 集群消费数据,采用 MERGE_ON_READ 模式写入到HDFS,表名 stock_ticks_mor
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_mor \ --target-table stock_ticks_mor \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --disable-compaction
stock_ticks_cow
stock_ticks_cow以日期分区,分区目录下有一个 元数据文件 和 parquet 格式的数据文件。
在 .hoodle 目录下可以看见 commit 信息。
stock_ticks_mor
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \ --jdbc-url jdbc:hive2://hiveserver:10000 \ --user hive \ --pass hive \ --partitioned-by dt \ --base-path /user/hive/warehouse/stock_ticks_cow \ --database default \ --table stock_ticks_cow /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \ --jdbc-url jdbc:hive2://hiveserver:10000 \ --user hive \ --pass hive \ --partitioned-by dt \ --base-path /user/hive/warehouse/stock_ticks_mor \ --database default \ --table stock_ticks_mor
进入 spark-shell:
$SPARK_INSTALL/bin/spark-shell \ --jars $HUDI_SPARK_BUNDLE \ --master local[2] \ --driver-class-path $HADOOP_CONF_DIR \ --conf spark.sql.hive.convertMetastoreParquet=false \ --deploy-mode client \ --driver-memory 1G \ --executor-memory 3G \ --num-executors 1 \ --packages org.apache.spark:spark-avro_2.11:2.4.4
Spark context Web UI available at http://adhoc-2:4040 Spark context available as 'sc' (master = local[2], app id = local-1644547729231). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("show tables").show(100, false) +--------+------------------+-----------+ |database|tableName |isTemporary| +--------+------------------+-----------+ |default |stock_ticks_cow |false | |default |stock_ticks_mor_ro|false | |default |stock_ticks_mor_rt|false | +--------+------------------+-----------+ ## Run max timestamp query against COW table scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ ## Projection Query scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022538859 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+ # Merge-On-Read Queries: # Run ReadOptimized Query. Notice that the latest timestamp is 10:29 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ # Run Snapshot Query. Notice that the latest timestamp is again 10:29 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ # Run Read Optimized and Snapshot project queries scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022707523 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022707523 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022707523 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022707523 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+
退出 docker 容器,在 Ubuntu 命令行执行
cat docker/demo/data/batch_2.json | kafkacat -b kafkabroker -t stock_ticks -P
进入容器 adhoc-2
sudo docker exec -it adhoc-2 /bin/bash
第二批数据到 Hudi CopyOnWrite 表
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_cow \ --target-table stock_ticks_cow \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
查看 hdfs 目录:
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_cow
第二批数据到 Hudi MergeOnRead 表
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_mor \ --target-table stock_ticks_mor \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --disable-compaction
查看 hdfs 目录:
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_mor
进入 spark-shell:
$SPARK_INSTALL/bin/spark-shell \ --jars $HUDI_SPARK_BUNDLE \ --master local[2] \ --driver-class-path $HADOOP_CONF_DIR \ --conf spark.sql.hive.convertMetastoreParquet=false \ --deploy-mode client \ --driver-memory 1G \ --executor-memory 3G \ --num-executors 1 \ --packages org.apache.spark:spark-avro_2.11:2.4.4
Spark context Web UI available at http://adhoc-2:4040 Spark context available as 'sc' (master = local[2], app id = local-1644571477181). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. # 1. 快照方式查询 CopyOnWrite 表 scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:59:00| +------+-------------------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+ # 2. 增量方式查询 CopyOnWrite 表 scala> import org.apache.hudi.DataSourceReadOptions scala> val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220211064632000").load("/user/hive/warehouse/stock_ticks_cow") scala> hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1") scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false); +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+ # 3. 读优化方式查询 MergeOnRead 表 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022538859 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+ # 4. 快照方式查询 MergeOnRead 表 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:59:00| +------+-------------------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+ # 5. 增量方式查询 MergeOnRead 表 scala> val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220211064632000").load("/user/hive/warehouse/stock_ticks_mor") scala> hoodieIncViewDF.registerTempTable("stock_ticks_mor_incr_tmp1") scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_incr_tmp1 where symbol = 'GOOG'").show(100, false); +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+
其中,对于 MergeOnRead 表,读优化查询和快照查询得到的结果是不一样的。
读优化查询:
+-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022538859 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| <<<<<<<<<<<<<<<<<< +-------------------+------+-------------------+------+---------+--------+
快照查询:
+-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| <<<<<<<<<<<<<<<<<< +-------------------+------+-------------------+------+---------+--------+
由此可以看出读优化查询与快照查询的区别。