Hudi Result Table
Function
Flink SQL jobs write data to Hudi tables.
Hudi is a data lake storage format that provides the ability to update and delete data and consume new data on top of HDFS. It supports multiple compute engines, provides insert, update, and delete (IUD) interfaces, and provides insert, update, and incremental pull capabilities on HDFS datasets.
Type |
Description |
---|---|
Supported Flink table type |
Source table and result table |
Supported Hudi table type |
MOR table and COW table |
Supported read/write type |
Batch read, batch write, streaming read, and streaming write |
For details, see Hudi.
Caveats
- You are advised to use SparkSQL for unified table creation.
- The table name must meet the following Hive format requirements:
- Must start with a letter or underscore (_) and cannot start with a digit.
- Can contain only letters, digits, and underscores (_).
- Can contain a maximum of 128 characters.
- Cannot contain spaces or special characters, such as colons (:), semicolons (;), and slashes (/).
- Is case insensitive. Lowercase letters are recommended.
- Cannot be Hive reserved keywords, such as select, from, and where.
Example:
my_table, customer_info, sales_data
Since the job writes data to the Hudi table only when a checkpoint is triggered, checkpointing must be enabled. The checkpoint interval should be adjusted according to service requirements. You are advised to set a larger interval.
- If the checkpoint interval is too short, the data may not refresh in time, causing job failures. You are advised to set the checkpoint interval to a few minutes.
- Set the number of tolerable checkpoint failures using execution.checkpointing.tolerable-failed-checkpoints.
For Flink On Hudi jobs, set checkpoint tolerance times to a larger value, for example, 100.
- To use the Hive style partitioning, add the following parameters:
'hoodie.datasource.write.hive_style_partitioning' = 'true' 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
- By default, Hudi table write statements use Flink status index. To use the bucket index, add the following parameters to the statement:
'index.type'='BUCKET', 'hoodie.bucket.index.num.buckets'='Number of buckets in each partition of a Hudi table' 'hoodie.bucket.index.hash.field'='recordkey.field'
- hoodie.bucket.index.num.buckets: Number of buckets in each partition of a Hudi table. Data in each partition is stored in each bucket in hash mode. This parameter cannot be modified after being set during table creation or data writing for the first time. Otherwise, an exception occurs during data update.
- hoodie.bucket.index.hash.field: Field for calculating the hash value during bucketing. The field must be a subset of the primary key. The default value is the primary key of the Hudi table. If this parameter is left blank, the default value recordkey.field is used.
- When you create a Flink OpenSource SQL job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs.
- Spark executes the compaction plan offline, as well as the clean and archive operations. For more details, see the Hudi data table compaction specifications.
When Flink jobs write to MOR tables, asynchronous compaction is required. Refer to the Hudi official website for parameters controlling the compaction interval.
run compaction on <database name>. <table name>; // Execute the compaction plan. run clean on <database name>. <table name>; // Execute the clean operation. run archivelog on <database name>.<table name>; // Execute the archive operation.
Syntax
create table hudiSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'hudi', 'path' = 'obs://xx', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'xx', 'write.precombine.field' = 'xx', 'read.streaming.enabled' = 'true' ... );
Parameter Description
Parameter |
Mandatory |
Default Value |
Data Type |
Parameter Description |
---|---|---|---|---|
connector |
Yes |
None |
String |
Type of the table to be read. Set it to hudi. |
path |
Yes |
None |
String |
Path for storing the table |
table.type |
Yes |
COPY_ON_WRITE |
String |
Hudi table type. The options are:
|
hoodie.datasource.write.recordkey.field |
Yes |
None |
String |
Primary key of the table
|
hoodie.datasource.write.partitionpath.field |
No |
None |
String |
Partition field of the Hudi table. For non-partitioned tables, do not specify this. For partitioned tables, it must be specified. |
write.precombine.field |
Yes |
None |
String |
Data combination field This field is used to determine whether a message should be updated based on its size. If you do not set this parameter, the system will default to updating based on the order in which messages are processed internally by the engine. |
write.payload.class |
No |
None |
String |
The write.payload.class parameter defines the data merge logic method. Specifically, it specifies how to handle multiple records with the same primary key during merge updates. The default value is OverwriteWithLatestAvroPayload. This policy overwrites old records with new ones. There are also various predefined payloads available for users, such as: DefaultHoodieRecordPayload, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, and EmptyHoodieRecordPayload. |
write.tasks |
No |
4 |
Integer |
Parallelism of tasks for writing Hudi tables. The recommended value is 4. |
index.type |
No |
INMEMORY |
String |
INMEMORY or BUCKET is supported. Default is INMEMORY. |
index.bootstrap.enabled |
No |
true |
Boolean |
Do not set this parameter when the BUCKET index is used. Flink uses the INMEMORY index by default, which caches the primary key of the data in memory to ensure the uniqueness of the target table data. This parameter must be configured to prevent duplicate data. Default value: true. |
write.index_bootstrap.tasks |
No |
Default parallelism in the environment |
Integer |
This parameter is valid only when index.bootstrap.enabled is enabled. Increase the number of tasks to improve the startup speed. The default value is the default parallelism in the environment. |
hoodie.bucket.index.num.buckets |
No |
5 |
Integer |
Number of buckets partitioned in each partition of the Hudi table. Data within each partition is hashed into each bucket. This parameter cannot be modified after being set during table creation or data writing for the first time. Otherwise, an exception occurs during data update. |
hoodie.bucket.index.hash.field |
No |
recordkey.field |
String |
Field used to calculate the hash value during bucketing. Must be a subset of the primary key, defaulting to the primary key of the Hudi table. If not specified, defaults to recordkey.field. |
index.state.ttl |
No |
0 |
Integer |
Duration for storing index data. The default value is 0 (unit: day), indicating that the index data is permanently valid. |
compaction.async.enabled |
No |
false |
Boolean |
Whether to enable online compaction
You are advised to disable online compaction to improve performance. However, you are still advised to set compaction.schedule.enabled to true to generate compaction plans periodically, which can then be executed offline and asynchronously. |
clean.async.enabled |
No |
true |
Boolean |
For COW tables, set it to true. For MOR tables, if asynchronous compaction is enabled by default (compaction.async.enabled set to false), it should be set to false to use asynchronous clean. You are advised to execute compaction and cleaning together asynchronously. |
hoodie.archive.automatic |
No |
true |
String |
For COW tables, set it to true. For MOR tables, if asynchronous compaction is enabled by default (compaction.async.enabled set to false), it should be set to false to use asynchronous archive. You are advised to execute compaction and archival together asynchronously. |
compaction.schedule.enabled |
No |
true |
Boolean |
Whether to generate compression plans periodically. You are advised to enable this function even if online compaction is disabled. |
compaction.delta_commits |
No |
5 |
Integer |
Conditions for triggering the compaction plan of MOR tables. The recommended value is 200. |
compaction.tasks |
No |
4 |
Integer |
Task parallelism for compaction in Hudi tables. You are advised to disable online compaction to improve performance. |
hive_sync.enable |
No |
false |
Boolean |
Whether to enable synchronization of table information to Hive. Set this parameter to true when you need to synchronize the metadata of the Hudi table to the Hive metadata storage to allow access to the Hudi table through the Hive query tool or within the data management page of the DLI console.
Enabling table information synchronization to Hive will use catalog-related permissions. Agency permissions for accessing the catalog must also be configured. |
hive_sync.mode |
No |
jdbc |
Enum |
Method for synchronizing metadata from the Hudi table to Hive. Select an appropriate synchronization method as needed.
|
hive_sync.table |
No |
None |
String |
Name of the table synchronized to Hive. The metadata of the Hudi table will be synchronized to this Hive table. |
hive_sync.db |
No |
default |
String |
Name of the database synchronized to Hive. The metadata of the Hudi table will be synchronized to the table in the Hive database. |
hive_sync.support_timestamp |
No |
true |
Boolean |
Whether to support timestamps. This parameter is used to ensure that the timestamp field can be correctly processed when metadata of the Hudi table is synchronized to Hive. You are advised to set it to true. |
changelog.enabled |
No |
false |
Boolean |
Whether to consume all changes (including intermediate changes). The options are:
Notes
|
Example: Use the DataGen Connector to Generate Data, Output It to a Hudi MOR Table (Using Order Date as the Partition Field), and Use HMS to Synchronize Metadata to Hive
- Create a Flink OpenSource SQL job. Enter the following job script and submit the job.
When you create a job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs. Change the values of the parameters in bold as needed in the following script.
create table orderSource ( order_id STRING, order_name STRING, order_time TIMESTAMP(3) ) with ( 'connector' = 'datagen' , 'rows-per-second'='100' ); CREATE TABLE huditest ( order_id STRING PRIMARY KEY NOT ENFORCED, order_name STRING, order_time TIMESTAMP(3), order_date String ) PARTITIONED BY (order_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://bucket/dir', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'order_time', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.table' = 'huditest', 'hive_sync.db' = 'dbtest' ); insert into huditest select order_id, order_name, order_time, DATE_FORMAT(order_time, 'yyyyMMdd') from orderSource;
- Run the following statement in Spark SQL to view the write results:
SELECT * FROM dbtest.huditest where order_date = 'xx'
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot