Hudi Table Streaming Writes
Configurations
Parameter |
Description |
Recommended Value |
Mandatory |
---|---|---|---|
Connector |
Type of the table to be read |
hudi |
Yes |
Path |
Path for storing the table |
Set this parameter as needed. |
Yes |
table.type |
Hudi table type. The options are as follows:
|
COPY_ON_WRITE |
Yes |
hoodie.datasource.write.recordkey.field |
Primary key of the table |
Set this parameter as needed. |
Yes |
write.precombine.field |
Data combination field |
Set this parameter as needed. |
Yes |
write.tasks |
Write task parallelism. The default value is 4. |
4 |
No |
index.bootstrap.enabled |
Do not configure this parameter when the Bucket index is used. Flink uses in-memory indexes. The primary key of the data needs to be cached to the memory to ensure that the data in the target table is unique. Set this parameter to prevent data duplication. The default value is true. |
true |
No |
write.index_bootstrap.tasks |
This parameter is valid only when index.bootstrap.enabled is enabled. Increase the number of tasks to improve the startup speed. The default value is the default parallelism in the environment. |
- |
No |
index.state.ttl |
Duration for storing index data. The default value is 0 (unit: day), indicating that the index data is permanently valid. |
- |
No |
hoodie.datasource.write.keygenerator.type |
Primary key generator type of the upstream table. The options are as follows:
|
COMPLEX |
No |
compaction.delta_commits |
Condition for triggering the compaction plan for MOR tables. The default value is 5. |
200 |
No |
compaction.async.enabled |
Whether to enable async compaction. The compaction runs on SparkSQL to improve write performance. Set this parameter to false to run asynchronous compaction on SparkSQL. |
false |
No |
clean.async.enabled |
Whether to clear old commits immediately upon new commits. This function is enabled by default.
|
false |
No |
clean.retain_commits |
Number of commits to retain The default value is 30. |
- |
No |
hoodie.archive.automatic |
Whether the archive table service is invoked immediately after each commit.
|
false |
No |
archive.min_commits |
Minimum number of commits to be retained before older commits are archived to the sequential log. The default value is 40. |
500 |
No |
archive.max_commits |
Maximum number of commits to be retained before older commits are archived to the sequential log. The default value is 50. |
600 |
No |
hive_sync.enable |
Whether to synchronize table information to Hive. |
true |
No |
hive_sync.metastore.uris |
Hivemeta URI |
Set this parameter as needed. |
No |
hive_sync.jdbc_url |
Hive JDBC link |
Set this parameter as needed. |
No |
hive_sync.table |
Hive table name |
Set this parameter as needed. |
No |
hive_sync.db |
Name of a Hive database |
Set this parameter as needed. |
No |
hive_sync.support_timestamp |
Whether to support timestamps |
true |
No |
changelog.enabled |
Whether to write Changelog messages. The options are as follows:
|
false |
No |
hoodie.datasource.write.hive_style_partitioning |
Whether to use Hive style partitioning. The options are as follows:
|
- |
No |
filter.delete.record.enabled |
Whether to filter delete messages.
If changelog is disabled, upstream delete messages cannot be written to the Hudi table. |
true |
No |
delete.empty.instant.ttl |
If no data is written to an instant and the LLT of the instant exceeds the configured value (unit: ms), the instant is deleted and a new instant is created. The default value is 5 minutes. The value -1 indicates that this function is disabled. |
10000 |
No |
Development Suggestions
-
Table names must meet the Hive requirements, for example, my_table, customer_info, and sales_data.
A table name:
- Must start with a letter or underscore (_) and cannot start with a digit.
- Can contain only letters, digits, underscores (_), and dots (.).
- Can contain a maximum of 128 characters.
- Cannot contain spaces or special characters, such as colons (:), semicolons (;), and slashes (/).
- Is case insensitive. Lowercase letters are recommended.
- Cannot be Hive reserved keywords, such as select, from, and where.
- Use Spark SQL to create Hudi tables in a unified manner. The following is an example:
create table hudi_mor_par_ddl ( id int, comb int, col0 int, col1 bigint, col2 float, col3 double, col4 decimal(30, 10), col5 string, col6 date, col7 timestamp, col8 boolean, col9 binary, par date ) using hudi partitioned by(par) options( type = 'mor', primaryKey = 'id', preCombineField = 'comb', hoodie.index.type = 'BUCKET' );
- Use Spark asynchronous tasks to compact Hudi tables. The following are examples for reference only:
Add the following parameters in the Flink job:
'compaction.async.enabled' = 'false', 'compaction.delta_commits' = '5', 'clean.async.enabled' = 'false', 'hoodie.archive.automatic' = 'false',
Example SparkSQL commands are as follows:
set hoodie.clean.automatic = true; set hoodie.clean.async = false; set hoodie.cleaner.commits.retained = 10; set hoodie.compact.inline = true; set hoodie.run.compact.only.inline = true; set hoodie.keep.min.commits = 500; set hoodie.keep.max.commits = 600; run compaction on tableName; run archivelog on tableName;
- Impact of DDL changes on stream writing of Hudi tables
DDL changes (such as adding a column, changing a column type and name, and deleting a column) affect Hudi table writes. Therefore, you need to stop the write jobs before changes are performed.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot