更新时间:2024-11-29 GMT+08:00

流式写Hudi表规范

流式写Hudi参数说明

表1 流式写Hudi参数说明

参数名称

参数描述

建议值

说明

Connector

读取表类型。

hudi

必填

Path

表存储的路径。

根据实际填写

必填

table.type

Hudi表类型:

  • MERGE_ON_READ
  • COPY_ON_WRITE(默认值)

COPY_ON_WRITE

必填

hoodie.datasource.write.recordkey.field

表的主键。

根据实际填写

必填

write.precombine.field

数据合并字段。

根据实际填写

必填

write.tasks

写Hudi表task并行度,默认值:4。

4

选填

index.bootstrap.enabled

Flink采用的是内存索引(使用Bueckt索引时不配置该项),需要将数据的主键缓存到内存中,保证目标表的数据唯一,因此需要配置该值,否则会导致数据重复,默认值:true。

true

选填

write.index_bootstrap.tasks

“index.bootstrap.enabled”开启后有效,增加任务数提升启动速度,默认值为环境默认并行度。

-

选填

index.state.ttl

索引数据保存时长,默认值:0(单位:天),表示永久不失效。

-

选填

hoodie.datasource.write.keygenerator.type

上游表主键生成类型:

  • SIMPLE(默认值)
  • COMPLEX(Spark建表时必填,且需设置为COMPLEX或与Spark建表时指定的参数值相同)
  • TIMESTAMP
  • CUSTOM
  • NON_PARTITION
  • GLOBAL_DELETE

COMPLEX

选填

compaction.delta_commits

MOR表Compaction计划触发条件,默认值:5次。

200

选填

compaction.async.enabled

是否开启在线压缩,将Compaction操作转移到SparkSQL运行,提升写性能。

建议设置为false,用SparkSQL做异步Compaction。

false

选填

clean.async.enabled

是否在新提交时立即清理旧提交,默认启用。

  • true(默认值)
  • false

false

选填

clean.retain_commits

要保留的提交数。默认30次。

-

选填

hoodie.archive.automatic

启用后,在每次提交后立即调用存档表服务。

  • true(默认值)
  • false

false

选填

archive.min_commits

将较旧的提交存档到顺序日志之前要保留的最小提交数,默认值为40

500

选填

archive.max_commits

将较旧的提交存档到顺序日志之前要保留的最大提交数,默认值为50

600

选填

hive_sync.enable

是否向Hive同步表信息。

true

选填

hive_sync.metastore.uris

Hivemeta URI信息。

根据实际填写

选填

hive_sync.jdbc_url

Hive JDBC链接。

根据实际填写

选填

hive_sync.table

Hive的表名。

根据实际填写

选填

hive_sync.db

Hive的数据库名。

根据实际填写

选填

hive_sync.support_timestamp

是否支持时间戳。

true

选填

changelog.enabled

是否写入Changelog消息:

  • false:不写入(默认值)。
  • true:写入,CDC场景需写入Changelog消息。

false

选填

hoodie.datasource.write.hive_style_partitioning

是否使用Hive风格的分区格式:
  • false:不使用Hive风格,分区目录名称仅使用分区值(默认值)。
  • true:使用Hive风格,分区目录名称格式为<partition_column_name>=<partition_value>。

    如果是Spark创建的Hudi分区表时为必填,且需要设置为true。

-

选填

filter.delete.record.enabled

是否过滤删除消息:

  • false:不过滤(默认值)。
  • true:过滤

在不开启changelog条件下,上游delete消息不支持写入Hudi表。

true

选填

delete.empty.instant.ttl

如果没有数据写入到instant并且instant的LLT超过配置的值(单位ms),则删除该instant并创建新的instant。默认值为5分钟,-1表示禁用该功能。

10000

选填

流式写Hudi开发建议

  • 表名必须满足Hive格式要求,如my_table、customer_info、sales_data等,详细规则如下:
    • 表名必须以字母或下划线开头,不能以数字开头。
    • 表名只能包含字母、数字、下划线和点号(.)。
    • 表名长度不能超过128个字符。
    • 表名中不能包含空格和特殊字符,如冒号、分号、斜杠等。
    • 表名不区分大小写,但建议使用小写字母
    • Hive保留关键字不能作为表名,如select、from、where等。
  • 建议使用Spark SQL统一建Hudi表,示例如下:
    create table hudi_mor_par_ddl (
      id int,
      comb int,
      col0 int,
      col1 bigint,
      col2 float,
      col3 double,
      col4 decimal(30, 10),
      col5 string,
      col6 date,
      col7 timestamp,
      col8 boolean,
      col9 binary,
      par date
    ) using hudi partitioned by(par) options(
      type = 'mor',
      primaryKey = 'id',
      preCombineField = 'comb',
      hoodie.index.type = 'BUCKET'
    );
  • 推荐使用Spark异步任务对Hudi表进行Compaction,示例如下:

    Flink添加参数:

      'compaction.async.enabled' = 'false',
      'compaction.delta_commits' = '5',
      'clean.async.enabled' = 'false',
      'hoodie.archive.automatic' = 'false',

    SparkSQL命令:

    set hoodie.clean.automatic = true;
      set hoodie.clean.async = false;
      set hoodie.cleaner.commits.retained = 10;
      set hoodie.compact.inline = true;
      set hoodie.run.compact.only.inline = true;
      set hoodie.keep.min.commits = 500;
      set hoodie.keep.max.commits = 600;
      run compaction on tableName;
      run archivelog on tableName;
  • DDL变更对流写Hudi表的影响

    DDL变更(如添加列、修改列类型、修改列名、删除列)等都影响Hudi表写入作业,需要在变更前停止作业。