Updated on 2022-12-14 GMT+08:00

Batch Write

Hudi provides multiple write modes. For details, see the configuration item hoodie.datasource.write.operation. This section describes upsert, insert, and bulk_insert.

  • insert: The operation process is similar to upsert. The query on updated file partitions is not based on indexes. Therefore, insert is faster than upsert. This operation is recommended for data sources that do not contain updated data. If the data source contains updated data, duplicate data will exist in the data lake.
  • bulk_insert (insert in batches): It is used for initial dataset loading. This operation sorts primary keys and then inserts data into a Hudi table by writing data to a common Parquet table. It has the best performance but cannot control small files. The upsert and insert operations can control small files by using heuristics.
  • upsert (insert and update): It is the default operation type. Hudi determines whether historical data exists based on the primary key. Historical data is updated, and other data is inserted. This operation is recommended for data sources, such as change data capture (CDC), that include updated data.
  • Primary keys are not sorted during insert. Therefore, you are not advised to use insert during dataset initialization.
  • You are advised to use insert if data is new, use upsert if data needs to be updated, and use bulk_insert if datasets need to be initialized.

Example:

df.write.format("hudi").
        option(PRECOMBINE_FIELD_OPT_KEY, "col4").// Specify the pre-combined field, which must be sortable.
        option(RECORDKEY_FIELD_OPT_KEY, "primary_key"). // Specify the primary key of the Hudi table. The primary key must be unique.
        option(PARTITIONPATH_FIELD_OPT_KEY, "col0").// Specify a partition.
        option(OPERATION_OPT_KEY, "bulk_insert").// Specify that the operation is bulk_insert.
        option("hoodie.bulkinsert.shuffle.parallelism", par.toString).// Specify the concurrency of the bulk_insert operation.
        option(HIVE_SYNC_ENABLED_OPT_KEY, "true").// Specify the synchronization of the Hudi table to Hive.
        option(HIVE_PARTITION_FIELDS_OPT_KEY, "col0").// Specify the Hive partition column name.
        option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
        option(HIVE_DATABASE_OPT_KEY, db).
        option(HIVE_TABLE_OPT_KEY, tableName).
        option(HIVE_USE_JDBC_OPT_KEY, "false").// Specify whether to use JDBC for Hive synchronization. The default value is true.
        option(TABLE_NAME, tableName). // Specify the table name.
	mode(Overwrite). // Specify the write mode.
	save(s"/tmp/${db}/${tableName}")// Specify the storage path of the Hudi table.
  • If the Spark DataSource API is used to update the MOR table, small files of the updated data may be merged when a small volume of data is upserted. As a result, some updated data can be found in the read-optimized view of the MOR table.
  • If the base file of the data to be updated is a small file, the data to be inserted and new data for update are merged with the base file to generate a new base file instead of being written to logs.

Configuring Partitions

Hudi supports multiple partitioning modes, such as multi-level partitioning, non-partitioning, single-level partitioning, and partitioning by date. You can select a proper partitioning mode as required. The following describes how to configure different partitioning modes for Hudi.

  • Multi-level partitioning

    Multi-level partitioning indicates that multiple fields are specified as partition keys. Pay attention to the following configuration items:

    Configuration Item

    Description

    hoodie.datasource.write.partitionpath.field

    Configure multiple partition fields, for example, p1, p2, and p3.

    hoodie.datasource.hive_sync.partition_fields

    Set this parameter to p1, p2, and p3. The values must be the same as the partition fields of hoodie.datasource.write.partitionpath.field.

    hoodie.datasource.write.keygenerator.class

    Set this parameter to org.apache.hudi.keygen.ComplexKeyGenerator.

    hoodie.datasource.hive_sync.partition_extractor_class

    Set this parameter to org.apache.hudi.hive.MultiPartKeysValueExtractor.

  • Non-partitioning

    Hudi supports non-partitioned tables. Pay attention to the following configuration items:

    Configuration Item

    Description

    hoodie.datasource.write.partitionpath.field

    Leave this parameter blank.

    hoodie.datasource.hive_sync.partition_fields

    Leave this parameter blank.

    hoodie.datasource.write.keygenerator.class

    Set this parameter to org.apache.hudi.keygen.NonpartitionedKeyGenerator.

    hoodie.datasource.hive_sync.partition_extractor_class

    Set this parameter to org.apache.hudi.hive.NonPartitionedExtractor.

  • Single-level partitioning

    It is similar to multi-level partitioning. Pay attention to the following configuration items:

    Configuration Item

    Description

    hoodie.datasource.write.partitionpath.field

    Set this parameter to one field, for example, p.

    hoodie.datasource.hive_sync.partition_fields

    Set this parameter to p.

    The value must be the same as the partition field of

    hoodie.datasource.write.partitionpath.field

    hoodie.datasource.write.keygenerator.class

    (Optional) The default value is org.apache.hudi.keygen.SimpleKeyGenerator.

    hoodie.datasource.hive_sync.partition_extractor_class

    Set this parameter to org.apache.hudi.hive.MultiPartKeysValueExtractor.

  • Partitioning by date

    The date field is specified as the partition field. Pay attention to the following configuration items:

    Configuration Item

    Description

    hoodie.datasource.write.partitionpath.field

    Set this parameter to the date field, for example, operationTime.

    hoodie.datasource.hive_sync.partition_fields

    Set this parameter to operationTime. The value must be the same as the preceding partition field.

    hoodie.datasource.write.keygenerator.class

    (Optional) The default value is org.apache.hudi.keygen.SimpleKeyGenerator.

    hoodie.datasource.hive_sync.partition_extractor_class

    Set this parameter to org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor.

    Date format for SlashEncodedDayPartitionValueExtractor must be yyyy/mm/dd.

  • Partition sorting

    Configuration Item

    Description

    hoodie.bulkinsert.user.defined.partitioner.class

    Specifies the partition sorting class. You can customize a sorting method. For details, see the sample code.

    By default, bulk_insert sorts data by character and applies only to primary keys of StringType.