Updated on 2025-07-04 GMT+08:00

Hudi Source Table

Function

Flink SQL reads data from Hudi tables.

Hudi is a data lake storage format that provides the ability to update and delete data and consume new data on top of HDFS. It supports multiple compute engines, provides insert, update, and delete (IUD) interfaces, and provides insert, update, and incremental pull capabilities on HDFS datasets.

Table 1 Supported types

Type

Description

Supported Flink table type

Source table and result table

Supported Hudi table type

MOR table and COW table

Supported read/write type

Batch read, batch write, streaming read, and streaming write

For details, see Hudi.

Caveats

  • Set traffic limit for streaming read when a Hudi table is used as the source table.

    If streaming read traffic exceeds the maximum, job exceptions may occur. Set streaming read limit (read.rate.limit) that equals the peak value verified by the service pressure test.

  • Run compaction on Hudi tables to prevent the long checkpointing of the Hudi Source operator.

    If the Hudi Source operator takes long time for checkpointing, check whether the compaction of the Hudi table is normal. If compaction is not performed for a long time, the list performance deteriorates.

  • When streaming read is enabled for Hudi MOR tables, enable log indexing to improve the Flink streaming read performance.

    The read and write performance of Hudi MOR tables can be improved through log indexing. Add 'hoodie.log.index.enabled'='true' for Sink tables and Source tables.

  • When you create a Flink OpenSource SQL job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs.

Syntax

create table hudiSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'hudi',
  'path' = 'obs://xx',
  'table.type' = 'xx',
  'hoodie.datasource.write.recordkey.field' = 'xx',
  'write.precombine.field' = 'xx',
  'read.streaming.enabled' = 'xx'
   ...
);

Parameter Description

When the downstream consumes Hudi files too slowly, the upstream archives the Hudi files. As a result, the "File Not Found" error occurs. Set Proper Consumption Parameters to Avoid "File Not Found"

Optimization suggestions:

  • Increase the value of read.tasks.
  • If there is a triffic limit, increase the upper limit.
  • Increase the upstream compaction, archive, and clean parameters.
Table 2 Parameters

Parameter

Mandatory

Default Value

Data Type

Description

connector

Yes

None

String

Type of the table to be read Set it to hudi.

path

Yes

None

String

Path for storing tables. Example: obs://xx/xx

table.type

Yes

COPY_ON_WRITE

String

Hudi table type. The options are:

  • MERGE_ON_READ
  • COPY_ON_WRITE

hoodie.datasource.write.recordkey.field

Yes

None

String

Primary key of the table

write.precombine.field

Yes

None

String

Data combination field.

read.tasks

No

4

Integer

Parallelism of the tasks for reading the Hudi table

read.streaming.enabled

Yes

false

Boolean

When set to true, data is read on streams incrementally; when set to false, data is read in batches. true is recommended.

read.streaming.start-commit

No

By default, the latest commit is the start position.

String

Start position (closed interval) of incremental stream and batch consumption in yyyyMMddHHmmss format.

hoodie.datasource.write.keygenerator.type

No

COMPLEX

Enum

Primary key generator type of the upstream table. The options are as follows:

  • SIMPLE (default value)
  • COMPLEX
  • TIMESTAMP
  • CUSTOM
  • NON_PARTITION
  • GLOBAL_DELETE

read.streaming.check-interval

No

60

Integer

Check interval (in seconds) for detecting new upstream commits. Use the default value 60 during high traffic.

read.end-commit

No

By default, the latest commit is the end position.

String

End commit time. The start and end commit time form a close interval. By default, the latest commit time is used.

read.rate.limit

No

0

Integer

Traffic limit (records per second) for streaming read Default value 0 indicates that there is no limit. The value is the total limit. Limit for each operator = read.rate.limit/read.tasks (number of read operators).

changelog.enabled

No

false

Boolean

Whether to consume all changes (including intermediate changes).

The options are:

  • true: Supports consuming all changes.
  • false: Does not consume all changes; instead, the UPSERT semantics are used. This guarantees that only the last merged message will be consumed for all messages, with intermediate changes potentially being merged.

Notes

  • Only MOR tables support this. In this mode, Hudi will keep all changes (Insert/-Update/Update/Delete) of the message.
  • In non-changelog mode, the stream read batch dataset will merge intermediate changes; batch reads (snapshot reads) will merge all intermediate results, regardless of whether the intermediate state has been written, and they will be ignored.
  • After enabling the changelog.enabled parameter, asynchronous compaction tasks will still merge intermediate changes into one data record. Therefore, if the stream read consumption is not timely, only the last record will be read after compaction. However, you can adjust the compaction frequency and reserve a certain time buffer for the reader, for example, by adjusting the compaction parameters compaction.delta_commits: 5 and compaction.delta_seconds: 3600.

Example: Read Data from a Hudi Table and Output the Results to Print

  1. Create a Flink OpenSource SQL job. Enter the following job script and submit the job.
    When you create a job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs. Change the values of the parameters in bold as needed in the following script.
    CREATE TABLE hudiSource (
      order_id STRING PRIMARY KEY NOT ENFORCED,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) WITH (
      'connector' = 'hudi',
      'path' = 'obs://bucket/dir',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'order_id',
      'write.precombine.field' = 'order_time',
      'read.streaming.enabled' = 'true'
    );
    
    create table printSink (
      order_id STRING,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) with (
      'connector' = 'print'
    );
    
    insert into
      printSink
    select
      *
    from
      hudiSource;
  2. After the job is submitted, the job status changes to Running. You can perform the following operations to view the output result:
    • Method 1:
      1. Log in to the DLI management console. In the navigation pane on the left, choose Job Management > Flink Jobs.
      2. Locate the row that contains the target Flink job, click More in the Operation column, and select FlinkUI.
      3. On the Flink UI, choose Task Managers, click the task name, and select Stdout to view job logs.
    • Method 2: If you select Save Job Log on the Running Parameters tab before submitting the job, perform the following operations:
      1. Log in to the DLI management console. In the navigation pane on the left, choose Job Management > Flink Jobs.
      2. Click the name of the corresponding Flink job, choose Run Log, click OBS Bucket, and locate the folder of the log you want to view according to the date.
      3. Go to the folder of the date, find the folder whose name contains taskmanager, download the taskmanager.out file, and view result logs.