更新时间:2025-07-04 GMT+08:00

Hudi源表

功能描述

Flink SQL读取Hudi表数据。

Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及消费变化数据的能力。支持多种计算引擎,提供IUD接口,在HDFS的数据集上提供了插入更新和增量拉取的功能。

表1 支持类别

类别

详情

支持Flink表类型

源表、结果表。

支持Hudi表类型

MOR表,COW表。

支持读写类型

批量读,批量写,流式读,流式写。

更多具体使用可参考开源社区文档:Hudi

注意事项

  • 建议Hudi作为Source表时设置限流

    Hudi表作为Source表时,为防止数据上限超过流量峰值导致作业出现异常,建议设置限流(read.rate.limit),限流上限应该为业务上线压测的峰值。

  • 及时对Hudi表进行Compaction,防止Hudi source算子checkpoint完成时间过长

    当Hudi Source算子checkpoint完成时间长时,检查该Hudi表Compaction是否正常。因为当长时间不做Compaction时list性能会变差。

  • 流读Hudi MOR表时,建议开启log index特性提升Flink流读性能

    Hudi的Mor表可以通过log index提升读写性能, Sink和Source表添加属性 'hoodie.log.index.enabled'='true'

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。

语法格式

create table hudiSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'hudi',
  'path' = 'obs://xx',
  'table.type' = 'xx',
  'hoodie.datasource.write.recordkey.field' = 'xx',
  'write.precombine.field' = 'xx',
  'read.streaming.enabled' = 'xx'
   ...
);

参数说明

当下游消费Hudi过慢,上游写入端会把Hudi文件归档,导致File Not Found问题。设置合理的消费参数避免File Not Found问题。

优化建议:

  • 调大read.tasks。
  • 如果有限流,调大限流参数。
  • 调大上游compaction、archive、clean参数。
表2 参数名称

参数

是否必选

默认值

数据类型

参数说明

connector

String

读取表类型。需要填写'hudi'

path

String

表存储的路径。如obs://xx/xx

table.type

COPY_ON_WRITE

String

Hudi表类型。

  • MERGE_ON_READ
  • COPY_ON_WRITE

hoodie.datasource.write.recordkey.field

String

表的主键。

write.precombine.field

String

数据合并字段。

read.tasks

4

Integer

读hudi表task并行度。

read.streaming.enabled

false

Boolean

设置 true 开启流式增量模式,false批量读。建议值为true

read.streaming.start-commit

默认从最新 commit

String

Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间)

hoodie.datasource.write.keygenerator.type

COMPLEX

Enum

上游表主键生成类型:

  • SIMPLE(默认值)
  • COMPLEX
  • TIMESTAMP
  • CUSTOM
  • NON_PARTITION
  • GLOBAL_DELETE

read.streaming.check-interval

60

Integer

流读监测上游新提交的周期(秒),流量大时建议使用默认值,默认值:60。

read.end-commit

默认到最新 commit

String

Batch增量消费,通过参数“read.streaming.start-commit”指定起始消费位置,通过参数“read.end-commit”指定结束消费位置,为闭区间,即包含起始、结束的Commit,默认到最新Commit。

read.rate.limit

0

Integer

流读Hudi的限流速率,单位为每秒的条数。默认值:0,表示不限速。该值为总限速大小,每个算子的限速大小需除以读算子个数(read.tasks)。

changelog.enabled

false

Boolean

是否消费所有变更(包含中间变更)。

参数取值如下:

  • true:支持消费所有变更。
  • false:不消费所有变更,即UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被merge掉。

说明

  • 只有MOR表支持,在该模式下Hudi会保留消息的所有变更(I/-U/U/D)
  • 非changelog模式,流读单次的batch数据集会merge中间变更;批读(快照读)会合并所有的中间结果,不管中间状态是否已被写入,都将被忽略。
  • 开启changelog.enabled参数后,异步的压缩任务仍然会将中间变更合并成1条数据,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。但是,可以通过调整压缩的频率,预留一定的时间buffer给 reader,比如调整compaction.delta_commits:5和compaction.delta_seconds: 3600压缩参数。

示例 读取Hudi表的数据,并将结果输出到Print中

  1. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE hudiSource (
      order_id STRING PRIMARY KEY NOT ENFORCED,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) WITH (
      'connector' = 'hudi',
      'path' = 'obs://bucket/dir',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'order_id',
      'write.precombine.field' = 'order_time',
      'read.streaming.enabled' = 'true'
    );
    
    create table printSink (
      order_id STRING,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) with (
      'connector' = 'print'
    );
    
    insert into
      printSink
    select
      *
    from
      hudiSource;
  2. 该作业提交后,作业状态变成“运行中”,后续您可通过如下操作查看输出结果。
    • 方法一:
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 在对应Flink作业所在行的“操作”列,选择“更多 > FlinkUI”。
      3. 在FlinkUI界面,选择“Task Managers”,单击对应的任务名称,选择“Stdout”查看作业运行日志。
    • 方法二:如果在提交运行作业前“运行参数”选择了“保存作业日志”,可以通过如下操作查看。
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
      3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。