Hudi Source Table
Function
Flink SQL reads data from Hudi tables.
Hudi is a data lake storage format that provides the ability to update and delete data and consume new data on top of HDFS. It supports multiple compute engines, provides insert, update, and delete (IUD) interfaces, and provides insert, update, and incremental pull capabilities on HDFS datasets.
| Type | Description | 
|---|---|
| Supported Flink table type | Source table and result table | 
| Supported Hudi table type | MOR table and COW table | 
| Supported read/write type | Batch read, batch write, streaming read, and streaming write | 
For details, see Hudi.
Caveats
- Set traffic limit for streaming read when a Hudi table is used as the source table.
    If streaming read traffic exceeds the maximum, job exceptions may occur. Set streaming read limit (read.rate.limit) that equals the peak value verified by the service pressure test. 
- Run compaction on Hudi tables to prevent the long checkpointing of the Hudi Source operator.
    If the Hudi Source operator takes long time for checkpointing, check whether the compaction of the Hudi table is normal. If compaction is not performed for a long time, the list performance deteriorates. 
- When streaming read is enabled for Hudi MOR tables, enable log indexing to improve the Flink streaming read performance.
    The read and write performance of Hudi MOR tables can be improved through log indexing. Add 'hoodie.log.index.enabled'='true' for Sink tables and Source tables. 
- When you create a Flink OpenSource SQL job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs.
Syntax
create table hudiSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'hudi',
  'path' = 'obs://xx',
  'table.type' = 'xx',
  'hoodie.datasource.write.recordkey.field' = 'xx',
  'write.precombine.field' = 'xx',
  'read.streaming.enabled' = 'xx'
   ...
);
 Parameter Description
 
 
   When the downstream consumes Hudi files too slowly, the upstream archives the Hudi files. As a result, the "File Not Found" error occurs. Set Proper Consumption Parameters to Avoid "File Not Found"
Optimization suggestions:
- Increase the value of read.tasks.
- If there is a triffic limit, increase the upper limit.
- Increase the upstream compaction, archive, and clean parameters.
| Parameter | Mandatory | Default Value | Data Type | Description | 
|---|---|---|---|---|
| connector | Yes | None | String | Type of the table to be read Set it to hudi. | 
| path | Yes | None | String | Path for storing tables. Example: obs://xx/xx | 
| table.type | Yes | COPY_ON_WRITE | String | Hudi table type. The options are: 
 | 
| hoodie.datasource.write.recordkey.field | Yes | None | String | Primary key of the table | 
| write.precombine.field | Yes | None | String | Data combination field. | 
| read.tasks | No | 4 | Integer | Parallelism of the tasks for reading the Hudi table | 
| read.streaming.enabled | Yes | false | Boolean | When set to true, data is read on streams incrementally; when set to false, data is read in batches. true is recommended. | 
| read.streaming.start-commit | No | By default, the latest commit is the start position. | String | Start position (closed interval) of incremental stream and batch consumption in yyyyMMddHHmmss format. | 
| hoodie.datasource.write.keygenerator.type | No | COMPLEX | Enum | Primary key generator type of the upstream table. The options are as follows: 
 | 
| read.streaming.check-interval | No | 60 | Integer | Check interval (in seconds) for detecting new upstream commits. Use the default value 60 during high traffic. | 
| read.end-commit | No | By default, the latest commit is the end position. | String | End commit time. The start and end commit time form a close interval. By default, the latest commit time is used. | 
| read.rate.limit | No | 0 | Integer | Traffic limit (records per second) for streaming read Default value 0 indicates that there is no limit. The value is the total limit. Limit for each operator = read.rate.limit/read.tasks (number of read operators). | 
| changelog.enabled | No | false | Boolean | Whether to consume all changes (including intermediate changes). The options are: 
 Notes 
 | 
Example: Read Data from a Hudi Table and Output the Results to Print
- Create a Flink OpenSource SQL job. Enter the following job script and submit the job.
    When you create a job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs. Change the values of the parameters in bold as needed in the following script.CREATE TABLE hudiSource ( order_id STRING PRIMARY KEY NOT ENFORCED, order_name STRING, order_time TIMESTAMP(3), order_date String ) WITH ( 'connector' = 'hudi', 'path' = 'obs://bucket/dir', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'order_time', 'read.streaming.enabled' = 'true' ); create table printSink ( order_id STRING, order_name STRING, order_time TIMESTAMP(3), order_date String ) with ( 'connector' = 'print' ); insert into printSink select * from hudiSource; 
- After the job is submitted, the job status changes to Running. You can perform the following operations to view the output result:
    - Method 1:
      - Log in to the DLI management console. In the navigation pane on the left, choose Job Management > Flink Jobs.
- Locate the row that contains the target Flink job, click More in the Operation column, and select FlinkUI.
- On the Flink UI, choose Task Managers, click the task name, and select Stdout to view job logs.
 
- Method 2: If you select Save Job Log on the Running Parameters tab before submitting the job, perform the following operations:
      - Log in to the DLI management console. In the navigation pane on the left, choose Job Management > Flink Jobs.
- Click the name of the corresponding Flink job, choose Run Log, click OBS Bucket, and locate the folder of the log you want to view according to the date.
- Go to the folder of the date, find the folder whose name contains taskmanager, download the taskmanager.out file, and view result logs.
 
 
- Method 1:
      
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot 
    