更新时间:2025-07-04 GMT+08:00

Hudi结果表

功能描述

Flink SQL作业写Hudi表。

Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及消费变化数据的能力。支持多种计算引擎,提供IUD接口,在HDFS的数据集上提供了插入更新和增量拉取的功能。

表1 支持类别

类别

详情

支持Flink表类型

源表、结果表。

支持Hudi表类型

MOR表,COW表。

支持读写类型

批量读,批量写,流式读,流式写。

更多具体使用可参考开源社区文档:Hudi

注意事项

  • 推荐使用SparkSQL统一建表
  • 表名必须满足Hive格式要求
    1. 表名必须以字母或下划线开头,不能以数字开头。
    2. 表名只能包含字母、数字、下划线。
    3. 表名长度不能超过128个字符。
    4. 表名中不能包含空格和特殊字符,如冒号、分号、斜杠等。
    5. 表名不区分大小写,但建议使用小写字母。
    6. Hive保留关键字不能作为表名,如select、from、where等。

    示例:

    my_table、customer_info、sales_data

    由于作业在触发CheckPoint时才会往Hudi表中写数据,所以需要开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。

  • 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
  • checkpoint容忍失败次数设置,execution.checkpointing.tolerable-failed-checkpoints。

    Flink On Hudi作业建议设置checkpoint容忍次数多次,如100。

  • 若需要使用Hive风格分区,需同时配置如下参数:
    'hoodie.datasource.write.hive_style_partitioning' = 'true'
    'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
  • 默认Hudi写表是Flink状态索引,如果需要使用bucket索引需要在Hudi写表中添加参数:
    'index.type'='BUCKET', 
    'hoodie.bucket.index.num.buckets'='Hudi表中每个分区划分桶的个数',
    'hoodie.bucket.index.hash.field'='recordkey.field'
    • hoodie.bucket.index.num.buckets:Hudi表中每个分区划分桶的个数,每个分区内的数据通过Hash方式放入每个桶内。建表或第一次写入数据时设置后不能修改,否则更新数据会存在异常。
    • hoodie.bucket.index.hash.field:进行分桶时计算Hash值的字段,必须为主键的子集,默认为Hudi表的主键。该参数不填则默认为recordkey.field。
  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • Spark离线完成Compaction计划的执行,以及Clean和Archive操作,详见Hudi数据表Compaction规范。

    Flink作业写MOR表时需要做异步compaction,控制compaction间隔的参数请参考Hudi官网

    run compaction on <database name>. <table name>;   // 执行Compaction计划
    run clean on <database name>. <table name>;        // 执行Clean操作
    run archivelog on <database name>.<table name>;    // 执行Archive操作

语法格式

create table hudiSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'hudi',
  'path' = 'obs://xx',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'xx',
  'write.precombine.field' = 'xx',
  'read.streaming.enabled' = 'true'
   ...
);

参数说明

参数名称

是否必选

默认值

数据类型

参数描述

connector

String

读取表类型。需要填写为'hudi'

path

String

表存储的路径

table.type

COPY_ON_WRITE

String

Hudi表类型。

  • MERGE_ON_READ
  • COPY_ON_WRITE

hoodie.datasource.write.recordkey.field

String

表的主键

  • 支持通过PRIMARY KEY语法设置主键字段。
  • 支持使用英文逗号(,)分隔多个字段。

hoodie.datasource.write.partitionpath.field

String

Hudi表的分区字段。无分区表不指定,分区表必须指定

write.precombine.field

String

数据合并字段

基于此字段的大小来判断消息是否进行更新。

如果您没有设置该参数,则系统默认会按照消息在引擎内部处理的先后顺序进行更新。

write.payload.class

String

write.payload.class 参数用于定义数据合并逻辑的方式,具体来说,它指定了在合并更新操作时如何处理相同主键的多条记录。

默认值 OverwriteWithLatestAvroPayload。该策略用于旧记录都会被新记录覆盖。同时也提供了多种预置Payload供用户使用,如

DefaultHoodieRecordPayload、

OverwriteNonDefaultsWithLatestAvroPayload、OverwriteWithLatestAvroPayload及EmptyHoodieRecordPayload。

write.tasks

4

Integer

写hudi表task并行度,建议值为4

index.type

INMEMORY

String

支持 INMEMORY 或者 BUCKET。默认是INMEMORY

index.bootstrap.enabled

true

Boolean

Flink默认采用的是内存索引(使用Bueckt索引时不配置该项),需要将数据的主键缓存到内存中,保证目标表的数据唯一,因此需要配置该值,否则会导致数据重复,默认值:true。

write.index_bootstrap.tasks

环境默认并行度

Integer

“index.bootstrap.enabled”开启后有效,增加任务数提升启动速度,默认值为环境默认并行度。

hoodie.bucket.index.num.buckets

5

Integer

Hudi表中每个分区划分桶的个数,每个分区内的数据通过Hash方式放入每个桶内。建表或第一次写入数据时设置后不能修改,否则更新数据会存在异常

hoodie.bucket.index.hash.field

recordkey.field

String

进行分桶时计算Hash值的字段,必须为主键的子集,默认为Hudi表的主键。该参数不填则默认为recordkey.field

index.state.ttl

0

Integer

索引数据保存时长,默认值:0(单位:天),表示永久不失效。

compaction.async.enabled

false

Boolean

是否开启在线压缩。

  • true:开启
  • false:关闭

建议关闭在线压缩,提升性能。但是调度compaction.schedule.enabled仍然建议开启,之后可通过离线异步压缩,执行阶段性生成的压缩plan。

clean.async.enabled

true

Boolean

COW表:设置为true

MOR表,且默认开启异步压缩时(compaction.async.enabled = false),需要设置为false,采用异步clean。建议和Compaction放在一起异步去执行

hoodie.archive.automatic

true

String

COW表:设置为true

MOR表,且默认开启异步压缩时(compaction.async.enabled = false),需要设置为false,采用异步archive。建议和Compaction放在一起异步去执行

compaction.schedule.enabled

true

Boolean

是否阶段性生成压缩plan,即使关闭在线压缩的情况下也建议开启

compaction.delta_commits

5

Integer

MOR表Compaction计划触发条件。建议值为200。

compaction.tasks

4

Integer

开启在线压缩时,压缩Hudi表task并行度。建议关闭在线压缩,提升性能。

hive_sync.enable

false

Boolean

是否启用向Hive同步表信息。

当你需要将Hudi表的元数据同步到 Hive元数据存储中,以便通过Hive查询工具或在DLI控制台的数据管理中访问Hudi表时,需要将此参数设置为 true。

  • true:Hudi 将会把表的元数据(如表结构、分区信息等)同步到 Hive 中。
  • false:不会向Hive同步表信息。

开启向hive同步表信息后会使用catalog相关权限,还需配置访问catalog的委托权限。

hive_sync.mode

jdbc

Enum

Hudi表元数据同步到Hive的方式

根据你的环境和需求选择合适的同步模式。

  • jdbc:通过JDBC连接到 Hive Server来同步元数据。
  • hms:通过Hive Meta Client 直接与Hive Metastore交互来同步元数据。
  • hiveql:通过执行Hive QL语句来同步元数据。

hive_sync.table

String

同步到Hive的表名称,Hudi表的元数据将同步到这个Hive表中

hive_sync.db

default

String

同步到Hive的数据库名称,Hudi表的元数据将同步到这个Hive数据库中的表。

hive_sync.support_timestamp

true

Boolean

是否支持时间戳,用于确保Hudi表的元数据同步到Hive时能够正确处理时间戳字段。

建议配置为True。

changelog.enabled

false

Boolean

是否消费所有变更(包含中间变更)。

参数取值如下:

  • true:支持消费所有变更。
  • false:不消费所有变更,即UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被merge掉。

说明

  • 只有MOR表支持,在该模式下Hudi会保留消息的所有变更(I/-U/U/D)
  • 非changelog模式,流读单次的batch数据集会merge中间变更;批读(快照读)会合并所有的中间结果,不管中间状态是否已被写入,都将被忽略。
  • 开启changelog.enabled参数后,异步的压缩任务仍然会将中间变更合并成1条数据,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。但是,可以通过调整压缩的频率,预留一定的时间buffer给 reader,比如调整compaction.delta_commits:5和compaction.delta_seconds: 3600压缩参数。

示例 使用DataGen connector产生数据,输出到Hudi的MOR表中(以订单日期作为分区字段),并使用HMS方式同步元数据到Hive

  1. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    create table orderSource (
      order_id STRING,
      order_name STRING,
      order_time TIMESTAMP(3)
    ) with (
      'connector' = 'datagen' ,
      'rows-per-second'='100'
    );
    
    CREATE TABLE huditest (
      order_id STRING PRIMARY KEY NOT ENFORCED,
      order_name STRING,
      order_time TIMESTAMP(3),
      order_date String
    ) PARTITIONED BY (order_date) WITH (
      'connector' = 'hudi',
      'path' = 'obs://bucket/dir',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'order_id',
      'write.precombine.field' = 'order_time',
      'hive_sync.enable' = 'true',
      'hive_sync.mode' = 'hms',
      'hive_sync.table' = 'huditest',
      'hive_sync.db' = 'dbtest'
    );
    
    insert into
      huditest
    select
      order_id, order_name, order_time, DATE_FORMAT(order_time, 'yyyyMMdd')
    from
      orderSource;
  2. 在Spark SQL中执行下述语句,查看写入结果
    SELECT * FROM dbtest.huditest where order_date = 'xx'