在上一篇博客HUDI preCombinedField 总结中已经对preCombinedField进行总结过一次了,由于当时对源码理解还不够深入,导致分析的不全面,现在对源码有了进一步的理解,所以再进行总结补充一下。
上面总结中:
DF:无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
SQL:写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
这里解释一下原因,首先Spark SQL PAYLOAD_CLASS_NAME 默认值为ExpressionPayload,而ExpressionPayload继承了DefaultHoodieRecordPayload
class ExpressionPayload(record: GenericRecord, orderingVal: Comparable[_]) extends DefaultHoodieRecordPayload(record, orderingVal) {
DefaultHoodieRecordPayload 里的needUpdatingPersistedRecord实现了历史值进行比较,具体实现,后面会进行分析
而 Spark DF在hudi0.9.0版本 PAYLOAD_CLASS_NAME的默认值为OverwriteWithLatestAvroPayload,它是DefaultHoodieRecordPayload的父类并没有实现和历史值进行比较
对源码进行简单的分析,首先说明历史比较值的配置项为:
HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.ordering.field"
而它的默认值为ts,所以ordering_field和preCombineField并不一样,但是因为默认值一样而且实现都在PAYLOAD_CLASS里,所以给人的感觉是一样,故放在一起进行总结
hudi 在 upsert进行小文件合并时,会走到HoodieMergeHandled的write方法:
/** * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. */ public void write(GenericRecord oldRecord) { // 历史key值 String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); boolean copyOldRecord = true; if (keyToNewRecords.containsKey(key)) { //如果新记录的key值包含旧值,则进行合并逻辑 // If we have duplicate records that we are updating, then the hoodie record will be deflated after // writing the first record. So make a copy of the record to be merged HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key)); try { // 这里调用了 PAYLOAD_CLASS 的 combineAndGetUpdateValue方法 Option<IndexedRecord> combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getPayloadConfig().getProps()); if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) { // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record. copyOldRecord = true; } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) { /* * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully * write the the combined new * value * * We no longer need to copy the old record over. */ copyOldRecord = false; } writtenRecordKeys.add(key); } catch (Exception e) { throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e); } } if (copyOldRecord) { // this should work as it is, since this is an existing record try { fileWriter.writeAvro(key, oldRecord); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); LOG.debug("Old record is " + oldRecord); throw new HoodieUpsertException(errMsg, e); } recordsWritten++; } }
看一下 DefaultHoodieRecordPayload的combineAndGetUpdateValue:
@Override /** * currentValue 当前值,即历史记录值 * Option<IndexedRecord> combinedAvroRecord = * hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, * useWriterSchema ? tableSchemaWithMetaFields : tableSchema, * config.getPayloadConfig().getProps()); */ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { // recordBytes 为新数据的字节值 if (recordBytes.length == 0) { return Option.empty(); } // 将recordBytes转化为Avro格式的GenericRecord GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); // Null check is needed here to support schema evolution. The record in storage may be from old schema where // the new ordering column might not be present and hence returns null. // 如果不需要历史值,则返回历史记录值 if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) { return Option.of(currentValue); } /* * We reached a point where the value is disk is older than the incoming record. */ eventTime = updateEventTime(incomingRecord, properties); /* * Now check if the incoming record is a delete record. */ return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); }
关于recordBytes的赋值,在父类BaseAvroPayload,我们写数据时需要先构造GenericRecord record,然后将record作为参数传给PayLoad,最后构造构造List<HoodieRecord>,调用HoodieJavaWriteClient.upsert(List<HoodieRecord> records,
String instantTime)
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0]; this.orderingVal = orderingVal; if (orderingVal == null) { throw new HoodieException("Ordering value is null for record: " + record); } }
和历史值的比较就在这里:
protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, IndexedRecord incomingRecord, Properties properties) { /* * Combining strategy here returns currentValue on disk if incoming record is older. * The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true) * or an insert/update record. In any case, if it is older than the record in disk, the currentValue * in disk is returned (to be rewritten with new commit time). * * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path * and need to be dealt with separately. */ // 历史ts值 Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true); // 新数据的ts值 Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord, properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), false); // 如果历史值为null或者历史值小于新值,则返回true,代表要覆盖历史值更新,反之不更新 return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0; }
可以看到在上面HoodieMergeHandle中传的properties参数为config.getPayloadConfig().getProps()
getPayloadConfig返回HoodiePayloadConfig,而在HoodiePayloadConfig定义了PAYLOAD_ORDERING_FIELD_PROP_KEY的默认值为ts
public HoodiePayloadConfig getPayloadConfig() { return hoodiePayloadConfig; } public class HoodiePayloadConfig extends HoodieConfig { public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty .key(PAYLOAD_ORDERING_FIELD_PROP_KEY) .defaultValue("ts") .withDocumentation("Table column/field name to order records that have the same key, before " + "merging and writing to storage.");
首先说明,预合并实现方法为类 OverwriteWithLatestAvroPayload.preCombine
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> { public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) { this(record.isPresent() ? record.get() : null, 0); // natural order } @Override public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) { if (oldValue.recordBytes.length == 0) { // use natural order for delete record return this; } // 如果旧值的orderingVal大于orderingVal,发返回旧值,否则返回当前新值,即返回较大的record if (oldValue.orderingVal.compareTo(orderingVal) > 0) { // pick the payload with greatest ordering value return oldValue; } else { return this; } }
所以无论是Spark SQL 还是 Spark DF都默认实现了预合并ExpressionPayload、DefaultHoodieRecordPayload都继承了(extends
)OverwriteWithLatestAvroPayload,所以用这三个payload都可以实现预合并,关键看怎么构造paylod
根据上面的代码,我们可以发现OverwriteWithLatestAvroPayload有两个构造函数,一个参数和两个参数,其中一个参数的并不能实现预合并,因为预合并方法中需要orderingVal比较,所以要用两个参数的构造函数构造OverwriteWithLatestAvroPayload,其中orderingVal 为 preCombineField对应的值,record为一行记录值。而无论是Spark SQL还是Spark DF,最终都会调用HoodieSparkSqlWriter.write
,构造paylod就是在这个write方法里实现的。
// Convert to RDD[HoodieRecord] // 首先将df转为RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, org.apache.hudi.common.util.Option.of(schema)) // 判断是否需要预合并 val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT) || parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean val hoodieAllIncomingRecords = genericRecords.map(gr => { val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns) val hoodieRecord = if (shouldCombine) { // 如果需要预合并 // 从record中取出PRECOMBINE_FIELD对应的值,如果值不存在,则抛出异常,因为预合并的字段不允许存在空值 val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false) .asInstanceOf[Comparable[_]] 然后通过反射的方法,构造PAYLOAD_CLASS_NAME对应的paylod DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME)) } else { // 如果不需要预合并,也通过反射构造paylod,但是不需要orderingVal参数 DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME)) } hoodieRecord }).toJavaRDD()
通过上面源码的注释中可以看到,如果需要进行预合并的话,则首先取出record中对应的PRECOMBINE_FIELD值orderingVal,然后构造payload,即
new OverwriteWithLatestAvroPayload(record, orderingVal)
这里就构造好了payload,那么最终是在哪里实现的预合并呢?
这里以cow表的upsert为例,即HoodieJavaCopyOnWriteTable.upsert
// HoodieJavaCopyOnWriteTable @Override public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> records) { return new JavaUpsertCommitActionExecutor<>(context, config, this, instantTime, records).execute(); } // JavaUpsertCommitActionExecutor @Override public HoodieWriteMetadata<List<WriteStatus>> execute() { return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); } // AbstractWriteHelper public HoodieWriteMetadata<O> write(String instantTime, I inputRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table, boolean shouldCombine, int shuffleParallelism, BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performTagging) { try { // De-dupe/merge if needed I dedupedRecords = combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; if (performTagging) { // perform index loop up to get existing location of records taggedRecords = tag(dedupedRecords, context, table); } Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); HoodieWriteMetadata<O> result = executor.execute(taggedRecords); result.setIndexLookupDuration(indexLookupDuration); return result; } catch (Throwable e) { if (e instanceof HoodieUpsertException) { throw (HoodieUpsertException) e; } throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e); } } public I combineOnCondition( boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) { return condition ? deduplicateRecords(records, table, parallelism) : records; } /** * Deduplicate Hoodie records, using the given deduplication function. * * @param records hoodieRecords to deduplicate * @param parallelism parallelism or partitions to be used while reducing/deduplicating * @return Collection of HoodieRecord already be deduplicated */ public I deduplicateRecords( I records, HoodieTable<T, I, K, O> table, int parallelism) { return deduplicateRecords(records, table.getIndex(), parallelism); } // SparkWriteHelper @Override public JavaRDD<HoodieRecord<T>> deduplicateRecords( JavaRDD<HoodieRecord<T>> records, HoodieIndex<T, ?, ?, ?> index, int parallelism) { boolean isIndexingGlobal = index.isGlobal(); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath // 获取record的key值 Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; // 返回 (key,record) return new Tuple2<>(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") // key值相同的record 通过 preCombine函数,返回 preCombineField值较大那个 T reducedData = (T) rec2.getData().preCombine(rec1.getData()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieRecord<T>(reducedKey, reducedData); }, parallelism).map(Tuple2::_2); }
这样就实现了预合并的功能
最后说一下历史比较值是怎么修改的,其实Spark SQL 和 Spark DF不用特意修改它的值,因为默认和preCombineField值是同步修改的,看一下程序怎么同步修改的。
无论是是SQL还是DF最终都会调用HoodieSparkSqlWriter.write
// Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, String tblName, Map<String, String> parameters) { return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters)); } public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, Map<String, String> parameters) { boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key())); boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key()) .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key())); boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE().key())); // insert/bulk-insert combining to be true, if filtering for duplicates boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key())); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withPath(basePath).withAutoCommit(false).combineInput(combineInserts, true); if (schemaStr != null) { builder = builder.withSchema(schemaStr); } return builder.forTable(tblName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key())) .withInlineCompaction(inlineCompact).build()) .withClusteringConfig(HoodieClusteringConfig.newBuilder() .withInlineClustering(inlineClusteringEnabled) .withAsyncClustering(asyncClusteringEnabled).build()) // 在这里设置里OrderingField 的值等于 PRECOMBINE_FIELD,所以默认和PRECOMBINE_FIELD是同步修改的 .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key())) .build()) // override above with Hoodie configs specified as options. .withProps(parameters).build(); }
如果确实想修改默认值,即和PRECOMBINE_FIELD不一样,
那么sql:
set hoodie.payload.ordering.field=ts;
DF:
.option("hoodie.payload.ordering.field", "ts") 或 .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts")