更新时间:2025-12-26 GMT+08:00
分享

Hudi Payload操作说明

Payload介绍

Payload是Hudi实现数据增量更新和删除的关键,它可以帮助Hudi在数据湖中高效地管理数据变更。Hudi Payload的格式是基于Apache Avro的,它使用了Avro的schema来定义数据的结构和类型。Payload可以被序列化和反序列化,以便在Hudi中进行数据的读取和写入。总之,Hudi Payload是Hudi的一个重要组成部分,它提供了一种可靠的、高效的、可扩展的方式来管理大规模数据湖中的数据变更。

  • 增量更新:Payload允许Hudi在数据湖中高效地进行增量更新。当数据发生变化时,Hudi可以只更新或插入发生变化的数据,而不是重新写入整个数据集。
  • 删除操作:Payload支持逻辑删除和物理删除,确保数据的完整性和一致性。逻辑删除通过标记数据为已删除,而物理删除则实际移除数据。

约束与限制

本章节仅适用于MRS 3.3.0及之后版本。

常用Payload

  • DefaultHoodieRecordPayload

    Hudi中默认使用DefaultHoodieRecordPayload,该Payload通过比较增量数据与存量数据的preCombineField字段值的大小来决定同主键的存量数据是否能被同主键的增量数据更新。在同主键的增量数据的preCombineField字段值绝对大于同主键的存量数据的preCombineField字段值时,同主键的增量数据将会被更新。

  • OverwriteWithLatestAvroPayload

    该Payload保证同主键的增量数据永远都会更新至同主键的存量数据中。

  • PartialUpdateAvroPayload

    该Payload继承了OverwriteNonDefaultsWithLatestAvroPayload,它可以保证在任何场景下增量数据中的null值不会覆盖存量数据。

使用Payload示例

  • 使用Spark SQL创建Hudi表并指定Payload。例如指定Payload类为OverwriteWithLatestAvroPayload,示例如下:
    create table hudi_test(id int, comb int, price string, name string, par string) using hudi options(
    primaryKey = "id", 
    preCombineField = "comb", 
    payloadClass="org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") partitioned by (par);
  • 使用Datasource方式写入时指定Payload。例如指定Payload类为DefaultHoodieRecordPayload,示例如下:
    data.write.format("hudi").
    option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).
    option("hoodie.datasource.write.precombine.field", "comb").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.datasource.write.partitionpath.field", "par").
    option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.DefaultHoodieRecordPayload").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
    option("hoodie.datasource.write.operation", "upsert").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "par").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.table", "hudi_test").
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.upsert.shuffle.parallelism", 4).
    option("hoodie.datasource.write.hive_style_partitioning", "true").
    option("hoodie.table.name", "hudi_test").mode(Append).save(s"/tmp/hudi_test")

相关文档