Hudi源表
注意事项
- 建议Hudi作为Source表时设置限流
Hudi表作为Source表时,为防止数据上限超过流量峰值导致作业出现异常,建议设置限流(read.rate.limit),限流上限应该为业务上线压测的峰值。
- 及时对Hudi表进行Compaction,防止Hudi source算子checkpoint完成时间过长
当Hudi Source算子checkpoint完成时间长时,检查该Hudi表Compaction是否正常。因为当长时间不做Compaction时list性能会变差。
- 流读Hudi MOR表时,建议开启log index特性提升Flink流读性能
Hudi的Mor表可以通过log index提升读写性能, Sink和Source表添加属性 'hoodie.log.index.enabled'='true'
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
语法格式
create table hudiSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'hudi', 'path' = 'obs://xx', 'table.type' = 'xx', 'hoodie.datasource.write.recordkey.field' = 'xx', 'write.precombine.field' = 'xx', 'read.streaming.enabled' = 'xx' ... );
参数说明
当下游消费Hudi过慢,上游写入端会把Hudi文件归档,导致File Not Found问题。设置合理的消费参数避免File Not Found问题。
优化建议:
- 调大read.tasks。
- 如果有限流,调大限流参数。
- 调大上游compaction、archive、clean参数。
参数 |
是否必选 |
默认值 |
数据类型 |
参数说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
读取表类型。需要填写'hudi' |
path |
是 |
无 |
String |
表存储的路径。如obs://xx/xx |
table.type |
是 |
COPY_ON_WRITE |
String |
Hudi表类型。
|
hoodie.datasource.write.recordkey.field |
是 |
无 |
String |
表的主键。 |
write.precombine.field |
是 |
无 |
String |
数据合并字段。 |
read.tasks |
否 |
4 |
Integer |
读hudi表task并行度。 |
read.streaming.enabled |
是 |
false |
Boolean |
设置 true 开启流式增量模式,false批量读。建议值为true |
read.streaming.start-commit |
否 |
默认从最新 commit |
String |
Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间) |
hoodie.datasource.write.keygenerator.type |
否 |
COMPLEX |
Enum |
上游表主键生成类型:
|
read.streaming.check-interval |
否 |
1 |
Integer |
流读监测上游新提交的周期(分钟),流量大时建议使用默认值,默认值:1。 |
read.end-commit |
否 |
默认到最新 commit |
String |
Batch增量消费,通过参数“read.streaming.start-commit”指定起始消费位置,通过参数“read.end-commit”指定结束消费位置,为闭区间,即包含起始、结束的Commit,默认到最新Commit。 |
read.rate.limit |
否 |
0 |
Integer |
流读Hudi的限流速率,单位为每秒的条数。默认值:0,表示不限速。该值为总限速大小,每个算子的限速大小需除以读算子个数(read.tasks)。 |
changelog.enabled |
否 |
false |
Boolean |
是否写入changelog消息。CDC场景填写为 true |
示例 读取Hudi表的数据,并将结果输出到Print中
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE hudiSource ( order_id STRING PRIMARY KEY NOT ENFORCED, order_name STRING, order_time TIMESTAMP(3), order_date String ) WITH ( 'connector' = 'hudi', 'path' = 'obs://bucket/dir', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'order_time', 'read.streaming.enabled' = 'true' ); create table printSink ( order_id STRING, order_name STRING, order_time TIMESTAMP(3), order_date String ) with ( 'connector' = 'print' ); insert into printSink select * from hudiSource;
- 该作业提交后,作业状态变成“运行中”,后续您可通过如下操作查看输出结果。
- 方法一:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 在对应Flink作业所在行的“操作”列,选择“更多 > FlinkUI”。
- 在FlinkUI界面,选择“Task Managers”,单击对应的任务名称,选择“Stdout”查看作业运行日志。
- 方法二:如果在提交运行作业前“运行参数”选择了“保存作业日志”,可以通过如下操作查看。
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
- 方法一: