更新时间:2024-11-29 GMT+08:00

使用Hudi Payload

Payload介绍

Payload是Hudi实现数据增量更新和删除的关键,它可以帮助Hudi在数据湖中高效的管理数据变更。Hudi Payload的格式是基于Apache Avro的,它使用了Avro的schema来定义数据的结构和类型。Payload可以被序列化和反序列化,以便在Hudi中进行数据的读取和写入。总之,Hudi Payload是Hudi的一个重要组成部分,它提供了一种可靠的、高效的、可扩展的方式来管理大规模数据湖中的数据变更。

常用Payload

  • DefaultHoodieRecordPayload

    Hudi中默认使用DefaultHoodieRecordPayload,该Payload通过比较增量数据与存量数据的preCombineField字段值的大小来决定同主键的存量数据是否能被同主键的增量数据更新。在同主键的增量数据的preCombineField字段值绝对大于同主键的存量数据的preCombineField字段值时,同主键的增量数据将会被更新。

  • OverwriteWithLatestAvroPayload

    该Payload保证同主键的增量数据永远都会更新同主键的增量数据。

  • PartialUpdateAvroPayload

    该Payload继承了OverwriteNonDefaultsWithLatestAvroPayload,它可以保证在任何场景下增量数据中的null值不会覆盖存量数据。

使用Payload

  • Spark建表时指定Payload
    create table hudi_test(id int, comb int, price string, name string, par string) using hudi options(
    primaryKey = "id", 
    preCombineField = "comb", 
    payloadClass="org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") partitioned by (par);
  • Datasource方式写入时指定Payload
    data.write.format("hudi").
    option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).
    option("hoodie.datasource.write.precombine.field", "comb").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.datasource.write.partitionpath.field", "par").
    option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.DefaultHoodieRecordPayload").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
    option("hoodie.datasource.write.operation", "upsert").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "par").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.table", "hudi_test").
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.upsert.shuffle.parallelism", 4).
    option("hoodie.datasource.write.hive_style_partitioning", "true").
    option("hoodie.table.name", "hudi_test").mode(Append).save(s"/tmp/hudi_test")