Updated on 2024-11-29 GMT+08:00

Hudi Table Streaming Writes

Configurations

Table 1 Configuration parameters for Hudi table streaming writes

Parameter

Description

Recommended Value

Mandatory

Connector

Type of the table to be read

hudi

Yes

Path

Path for storing the table

Set this parameter as needed.

Yes

table.type

Hudi table type. The options are as follows:

  • MERGE_ON_READ
  • COPY_ON_WRITE (default value)

COPY_ON_WRITE

Yes

hoodie.datasource.write.recordkey.field

Primary key of the table

Set this parameter as needed.

Yes

write.precombine.field

Data combination field

Set this parameter as needed.

Yes

write.tasks

Write task parallelism. The default value is 4.

4

No

index.bootstrap.enabled

Do not configure this parameter when the Bucket index is used. Flink uses in-memory indexes. The primary key of the data needs to be cached to the memory to ensure that the data in the target table is unique. Set this parameter to prevent data duplication. The default value is true.

true

No

write.index_bootstrap.tasks

This parameter is valid only when index.bootstrap.enabled is enabled. Increase the number of tasks to improve the startup speed. The default value is the default parallelism in the environment.

-

No

index.state.ttl

Duration for storing index data. The default value is 0 (unit: day), indicating that the index data is permanently valid.

-

No

hoodie.datasource.write.keygenerator.type

Primary key generator type of the upstream table. The options are as follows:

  • SIMPLE (default value)
  • COMPLEX (When a Spark table is created, use this value, or set the value same as the one specified when the Spark table is created.)
  • TIMESTAMP
  • CUSTOM
  • NON_PARTITION
  • GLOBAL_DELETE

COMPLEX

No

compaction.delta_commits

Condition for triggering the compaction plan for MOR tables. The default value is 5.

200

No

compaction.async.enabled

Whether to enable async compaction. The compaction runs on SparkSQL to improve write performance.

Set this parameter to false to run asynchronous compaction on SparkSQL.

false

No

clean.async.enabled

Whether to clear old commits immediately upon new commits. This function is enabled by default.

  • true (default value)
  • false

false

No

clean.retain_commits

Number of commits to retain The default value is 30.

-

No

hoodie.archive.automatic

Whether the archive table service is invoked immediately after each commit.

  • true (default value)
  • false

false

No

archive.min_commits

Minimum number of commits to be retained before older commits are archived to the sequential log. The default value is 40.

500

No

archive.max_commits

Maximum number of commits to be retained before older commits are archived to the sequential log. The default value is 50.

600

No

hive_sync.enable

Whether to synchronize table information to Hive.

true

No

hive_sync.metastore.uris

Hivemeta URI

Set this parameter as needed.

No

hive_sync.jdbc_url

Hive JDBC link

Set this parameter as needed.

No

hive_sync.table

Hive table name

Set this parameter as needed.

No

hive_sync.db

Name of a Hive database

Set this parameter as needed.

No

hive_sync.support_timestamp

Whether to support timestamps

true

No

changelog.enabled

Whether to write Changelog messages. The options are as follows:

  • false: No Changelog message will be written (default value).
  • true: Changelog messages will be written for CDC.

false

No

hoodie.datasource.write.hive_style_partitioning

Whether to use Hive style partitioning. The options are as follows:
  • false: The Hive style is not used. Only the partition value (default value) is used as the partition directory name.
  • true: The Hive style is used. The format of the partition directory name is <partition_column_name>=<partition_value>.

    If the Hudi partition table is created by Spark, this parameter is mandatory and must be set to true.

-

No

filter.delete.record.enabled

Whether to filter delete messages.

  • false: Do not filter (default value).
  • true: Filter delete messages.

If changelog is disabled, upstream delete messages cannot be written to the Hudi table.

true

No

delete.empty.instant.ttl

If no data is written to an instant and the LLT of the instant exceeds the configured value (unit: ms), the instant is deleted and a new instant is created. The default value is 5 minutes. The value -1 indicates that this function is disabled.

10000

No

Development Suggestions

  • Table names must meet the Hive requirements, for example, my_table, customer_info, and sales_data.

    A table name:

    • Must start with a letter or underscore (_) and cannot start with a digit.
    • Can contain only letters, digits, underscores (_), and dots (.).
    • Can contain a maximum of 128 characters.
    • Cannot contain spaces or special characters, such as colons (:), semicolons (;), and slashes (/).
    • Is case insensitive. Lowercase letters are recommended.
    • Cannot be Hive reserved keywords, such as select, from, and where.
  • Use Spark SQL to create Hudi tables in a unified manner. The following is an example:
    create table hudi_mor_par_ddl (
      id int,
      comb int,
      col0 int,
      col1 bigint,
      col2 float,
      col3 double,
      col4 decimal(30, 10),
      col5 string,
      col6 date,
      col7 timestamp,
      col8 boolean,
      col9 binary,
      par date
    ) using hudi partitioned by(par) options(
      type = 'mor',
      primaryKey = 'id',
      preCombineField = 'comb',
      hoodie.index.type = 'BUCKET'
    );
  • Use Spark asynchronous tasks to compact Hudi tables. The following are examples for reference only:

    Add the following parameters in the Flink job:

      'compaction.async.enabled' = 'false',
      'compaction.delta_commits' = '5',
      'clean.async.enabled' = 'false',
      'hoodie.archive.automatic' = 'false',

    Example SparkSQL commands are as follows:

    set hoodie.clean.automatic = true;
      set hoodie.clean.async = false;
      set hoodie.cleaner.commits.retained = 10;
      set hoodie.compact.inline = true;
      set hoodie.run.compact.only.inline = true;
      set hoodie.keep.min.commits = 500;
      set hoodie.keep.max.commits = 600;
      run compaction on tableName;
      run archivelog on tableName;
  • Impact of DDL changes on stream writing of Hudi tables

    DDL changes (such as adding a column, changing a column type and name, and deleting a column) affect Hudi table writes. Therefore, you need to stop the write jobs before changes are performed.