流式写Hudi表规范
流式写Hudi参数说明
参数名称 |
参数描述 |
建议值 |
说明 |
---|---|---|---|
Connector |
读取表类型。 |
hudi |
必填 |
Path |
表存储的路径。 |
根据实际填写 |
必填 |
table.type |
Hudi表类型:
|
COPY_ON_WRITE |
必填 |
hoodie.datasource.write.recordkey.field |
表的主键。 |
根据实际填写 |
必填 |
write.precombine.field |
数据合并字段。 |
根据实际填写 |
必填 |
write.tasks |
写Hudi表task并行度,默认值:4。 |
4 |
选填 |
index.bootstrap.enabled |
Flink采用的是内存索引(使用Bueckt索引时不配置该项),需要将数据的主键缓存到内存中,保证目标表的数据唯一,因此需要配置该值,否则会导致数据重复,默认值:true。 |
true |
选填 |
write.index_bootstrap.tasks |
“index.bootstrap.enabled”开启后有效,增加任务数提升启动速度,默认值为环境默认并行度。 |
- |
选填 |
index.state.ttl |
索引数据保存时长,默认值:0(单位:天),表示永久不失效。 |
- |
选填 |
hoodie.datasource.write.keygenerator.type |
上游表主键生成类型:
|
COMPLEX |
选填 |
compaction.delta_commits |
MOR表Compaction计划触发条件,默认值:5次。 |
200 |
选填 |
compaction.async.enabled |
是否开启在线压缩,将Compaction操作转移到SparkSQL运行,提升写性能。 建议设置为false,用SparkSQL做异步Compaction。 |
false |
选填 |
clean.async.enabled |
是否在新提交时立即清理旧提交,默认启用。
|
false |
选填 |
clean.retain_commits |
要保留的提交数。默认30次。 |
- |
选填 |
hoodie.archive.automatic |
启用后,在每次提交后立即调用存档表服务。
|
false |
选填 |
archive.min_commits |
将较旧的提交存档到顺序日志之前要保留的最小提交数,默认值为40 |
500 |
选填 |
archive.max_commits |
将较旧的提交存档到顺序日志之前要保留的最大提交数,默认值为50 |
600 |
选填 |
hive_sync.enable |
是否向Hive同步表信息。 |
true |
选填 |
hive_sync.metastore.uris |
Hivemeta URI信息。 |
根据实际填写 |
选填 |
hive_sync.jdbc_url |
Hive JDBC链接。 |
根据实际填写 |
选填 |
hive_sync.table |
Hive的表名。 |
根据实际填写 |
选填 |
hive_sync.db |
Hive的数据库名。 |
根据实际填写 |
选填 |
hive_sync.support_timestamp |
是否支持时间戳。 |
true |
选填 |
changelog.enabled |
是否写入Changelog消息:
|
false |
选填 |
hoodie.datasource.write.hive_style_partitioning |
- |
选填 |
|
filter.delete.record.enabled |
是否过滤删除消息:
在不开启changelog条件下,上游delete消息不支持写入Hudi表。 |
true |
选填 |
delete.empty.instant.ttl |
如果没有数据写入到instant并且instant的LLT超过配置的值(单位ms),则删除该instant并创建新的instant。默认值为5分钟,-1表示禁用该功能。 |
10000 |
选填 |
流式写Hudi开发建议
- 表名必须满足Hive格式要求,如my_table、customer_info、sales_data等,详细规则如下:
- 表名必须以字母或下划线开头,不能以数字开头。
- 表名只能包含字母、数字、下划线和点号(.)。
- 表名长度不能超过128个字符。
- 表名中不能包含空格和特殊字符,如冒号、分号、斜杠等。
- 表名不区分大小写,但建议使用小写字母。
- Hive保留关键字不能作为表名,如select、from、where等。
- 建议使用Spark SQL统一建Hudi表,示例如下:
create table hudi_mor_par_ddl ( id int, comb int, col0 int, col1 bigint, col2 float, col3 double, col4 decimal(30, 10), col5 string, col6 date, col7 timestamp, col8 boolean, col9 binary, par date ) using hudi partitioned by(par) options( type = 'mor', primaryKey = 'id', preCombineField = 'comb', hoodie.index.type = 'BUCKET' );
- 推荐使用Spark异步任务对Hudi表进行Compaction,示例如下:
'compaction.async.enabled' = 'false', 'compaction.delta_commits' = '5', 'clean.async.enabled' = 'false', 'hoodie.archive.automatic' = 'false',
SparkSQL命令:
set hoodie.clean.automatic = true; set hoodie.clean.async = false; set hoodie.cleaner.commits.retained = 10; set hoodie.compact.inline = true; set hoodie.run.compact.only.inline = true; set hoodie.keep.min.commits = 500; set hoodie.keep.max.commits = 600; run compaction on tableName; run archivelog on tableName;
- DDL变更对流写Hudi表的影响