Updated on 2024-05-29 GMT+08:00

Using Hudi Payload

This section applies only to MRS 3.3.0 or later.

Introduction to Payload

Payload is the key for Hudi to implement incremental data update and deletion. It helps Hudi efficiently manage data changes in the data lake. The format of Hudi Payload is based on Apache Avro. It uses the Avro schema to define the data structure and type. Payloads can be serialized and deserialized so that data can be read and written in Hudi. In a word, Hudi Payload is an important part of Hudi. It provides a reliable, efficient, and scalable way to manage data changes in a large-scale data lake.

Typical Payload

  • DefaultHoodieRecordPayload

    By default, DefaultHoodieRecordPayload is used in the Hudi. The payload compares the value of the preCombineField field in the incremental data with that in the inventory data to determine whether the inventory data with the same primary key can be updated by the incremental data with the same primary key. If the value of the preCombineField field in the incremental data with the same primary key is greater than that in the inventory data with the same primary key, the incremental data with the same primary key will be updated.

  • OverwriteWithLatestAvroPayload

    The Payload ensures that the incremental data with the same primary key will always be updated.

  • PartialUpdateAvroPayload

    This payload inherits OverwriteNonDefaultsWithLatestAvroPayload, which ensures that null values in incremental data do not overwrite inventory data in any scenario.

Using Payload

  • Specify a Payload during Spark table creation.
    create table hudi_test(id int, comb int, price string, name string, par string) using hudi options(
    primaryKey = "id", 
    preCombineField = "comb", 
    payloadClass="org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") partitioned by (par);
  • Specify a Payload when data is written in Datasource mode.
    data.write.format("hudi").
    option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).
    option("hoodie.datasource.write.precombine.field", "comb").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.datasource.write.partitionpath.field", "par").
    option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.DefaultHoodieRecordPayload").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
    option("hoodie.datasource.write.operation", "upsert").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "par").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.table", "hudi_test").
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.upsert.shuffle.parallelism", 4).
    option("hoodie.datasource.write.hive_style_partitioning", "true").
    option("hoodie.table.name", "hudi_test").mode(Append).save(s"/tmp/hudi_test")