更新时间:2024-12-27 GMT+08:00
分享

Hudi源表

功能描述

Flink SQL读取Hudi表数据。

更多具体使用可参考开源社区文档:Hudi

注意事项

  • 建议Hudi作为Source表时设置限流

    Hudi表作为Source表时,为防止数据上限超过流量峰值导致作业出现异常,建议设置限流(read.rate.limit),限流上限应该为业务上线压测的峰值。

  • 及时对Hudi表进行Compaction,防止Hudi source算子checkpoint完成时间过长

    当Hudi Source算子checkpoint完成时间长时,检查该Hudi表Compaction是否正常。因为当长时间不做Compaction时list性能会变差。

  • 流读Hudi MOR表时,建议开启log index特性提升Flink流读性能

    Hudi的Mor表可以通过log index提升读写性能, Sink和Source表添加属性 'hoodie.log.index.enabled'='true'

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。

语法格式

create table hudiSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'hudi',
  'path' = 'obs://xx',
  'table.type' = 'xx',
  'hoodie.datasource.write.recordkey.field' = 'xx',
  'write.precombine.field' = 'xx',
  'read.streaming.enabled' = 'xx'
   ...
);

参数说明

当下游消费Hudi过慢,上游写入端会把Hudi文件归档,导致File Not Found问题。设置合理的消费参数避免File Not Found问题。

优化建议:

  • 调大read.tasks。
  • 如果有限流,调大限流参数。
  • 调大上游compaction、archive、clean参数。
表1 参数名称

参数

是否必选

默认值

数据类型

参数说明

connector

String

读取表类型。需要填写'hudi'

path

String

表存储的路径。如obs://xx/xx

table.type

COPY_ON_WRITE

String

Hudi表类型。

  • MERGE_ON_READ
  • COPY_ON_WRITE

hoodie.datasource.write.recordkey.field

String

表的主键。

write.precombine.field

String

数据合并字段。

read.tasks

4

Integer

读hudi表task并行度。

read.streaming.enabled

false

Boolean

设置 true 开启流式增量模式,false批量读。建议值为true

read.streaming.start-commit

默认从最新 commit

String

Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间)

hoodie.datasource.write.keygenerator.type

COMPLEX

Enum

上游表主键生成类型:

  • SIMPLE(默认值)
  • COMPLEX
  • TIMESTAMP
  • CUSTOM
  • NON_PARTITION
  • GLOBAL_DELETE

read.streaming.check-interval

1

Integer

流读监测上游新提交的周期(分钟),流量大时建议使用默认值,默认值:1。

read.end-commit

默认到最新 commit

String

Batch增量消费,通过参数“read.streaming.start-commit”指定起始消费位置,通过参数“read.end-commit”指定结束消费位置,为闭区间,即包含起始、结束的Commit,默认到最新Commit。

read.rate.limit

0

Integer

流读Hudi的限流速率,单位为每秒的条数。默认值:0,表示不限速。该值为总限速大小,每个算子的限速大小需除以读算子个数(read.tasks)。

changelog.enabled

false

Boolean

是否写入changelog消息。CDC场景填写为

true

示例 读取Hudi表的数据,并将结果输出到Print中

  1. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE hudiSource (
      order_id STRING PRIMARY KEY NOT ENFORCED,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) WITH (
      'connector' = 'hudi',
      'path' = 'obs://bucket/dir',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'order_id',
      'write.precombine.field' = 'order_time',
      'read.streaming.enabled' = 'true'
    );
    
    create table printSink (
      order_id STRING,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) with (
      'connector' = 'print'
    );
    
    insert into
      printSink
    select
      *
    from
      hudiSource;
  2. 该作业提交后,作业状态变成“运行中”,后续您可通过如下操作查看输出结果。
    • 方法一:
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 在对应Flink作业所在行的“操作”列,选择“更多 > FlinkUI”。
      3. 在FlinkUI界面,选择“Task Managers”,单击对应的任务名称,选择“Stdout”查看作业运行日志。
    • 方法二:如果在提交运行作业前“运行参数”选择了“保存作业日志”,可以通过如下操作查看。
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
      3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

相关文档