FlinkServer对接Hudi
操作场景
本指南通过使用FlinkServer写FlinkSQL对接Hudi。FlinkSQL读写Hudi时,不支持定义TINYINT、SMALLINT和TIME类型。
前提条件
- 集群已安装HDFS、Yarn、Hive、Spark、Flink和Kafka等服务。
- 包含Flink、Kafka服务的客户端已安装,例如安装路径为:/opt/client。
- Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。并且用户需要添加hadoop、hive、kafkaadmin用户组,以及Manager_administrator角色。
- 仅支持Flink与Hive组件共集群。
约束与限制
本章节适用于MRS 3.6.0-LTS.1及之后的版本。
MRS 3.6.0-LTS.1版本,MOR表已默认是BUCKET索引,建表时必须指定BUCKET桶数,桶数请参考《MapReduce服务(MRS)组件设计开发规范》的“确定表索引”章节的“确认BUCKET索引桶数(BUCKET索引表必须参考此内容预估桶数)”来计算。
操作步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考Catalog管理创建一个Hudi Catalog,如:hudi_catalog。
- 新建Hudi表,具体操作如下:
- 新建一个Hudi mor表,表名为hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,完整示例如下:
CREATE TABLE hudi_catalog.`default`.hudi_table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '4', 'write.tasks' = '4' );
- 新建一个Hudi cow表,表名为hudi_catalog.`default`.hudi_cow_table,具体操作可参考数据管理,完整示例如下:
CREATE TABLE hudi_catalog.`default`.hudi_cow_table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'write.tasks' = '4' );
- FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html。
- MRS 3.6.0-LTS.1版本Flink使用catalog写hudi mor表需要指定'index.type',否则运行时会报错。
- 新建一个Hudi mor表,表名为hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,完整示例如下:
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。然后输入SQL,执行SQL校验通过后,启动作业。如下SQL示例将作为3个作业分别添加,依次运行。
所有作业均需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
建议开启故障恢复策略,提高作业可靠性。例如“故障恢复策略”选择“fixed-delay”,“重试次数”设置为“3”,“失败重试间隔”设置为“30”,重试次数和间隔可按实际业务需要填写。
作业启动,状态显示“运行中”后,可通过“更多 > 作业详情”跳转到Flink作业的原生UI页面,查看Job运行情况。
- 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
- 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常,建议CheckPoint间隔为分钟级。
- 作业1:FlinkSQL流式写入MOR表
CREATE TABLE datagen_source( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); insert into hudi_catalog.`default`.hudi_table select * from datagen_source;
- 作业2:FlinkSQL流式写入COW表
CREATE TABLE datagen_source( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); insert into hudi_catalog.`default`.hudi_cow_table select * from datagen_source;
- 作业3:FlinkSQL流式读取MOR和COW表并合并数据输出到Kafka。注意作业3需要等待作业1和作业2均启动后,状态显示为“运行中”后再执行SQL校验和启动作业(否则SQL校验可能提示错误,找不到Hudi表目录)。
CREATE TABLE print_sink( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'print' ); insert into print_sink select * from hudi_catalog.`default`.hudi_table union all select * from hudi_catalog.`default`.hudi_cow_table;
- 单击左上角“提交”提交作业。
- 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
- 进入作业详情界面,单击“Task Managers > Stdout”,查看是否正常打印表中数据。
FlinkSQL Lookup Join Hudi使用须知
适用于MRS 3.5.0及以后版本。
- 使用lookup.join.cache.ttl参数来控制维表数据的加载周期,默认值为60min。
- Hudi维表数据会被加载到Flink TaskManager Heap中,所以不推荐大于10万行记录的Hudi表作为维表。
- 维表的新增、更新数据需要等到下一次加载周期后,才能被加载进来参与计算。
- 新建一个Hudi表,例如:hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,完整示例如下:
CREATE TABLE hudi_catalog.`default`.hudi_table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudimor', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '4', 'write.precombine.field' = 'ts', 'lookup.join.cache.ttl' = '60min' );
CREATE TABLE datagen(uuid varchar(20), proctime as PROCTIME()) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE blackhole (
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts INT,
`p` VARCHAR(20)
) WITH ('connector' = 'blackhole');
insert into
blackhole
select
t1.uuid as uuid,
t2.name as name,
t2.age as age,
t2.ts as ts,
t2.p as p
FROM
datagen AS t1
left JOIN hudi_catalog.`default`.hudi_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid; WITH主要参数说明
| 方式 | 配置项 | 是否必选 | 默认值 | 描述 |
|---|---|---|---|---|
| 读取 | read.tasks | 否 | 4 | 读Hudi表task并行度。 |
| read.streaming.enabled | 否 | false | 是否开启流读模式。 | |
| read.streaming.start-commit | 否 | 默认从最新commit | Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间)。 | |
| read.end-commit | 否 | 默认到最新commit | Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的结束消费位置(闭区间)。 | |
| 写入 | write.tasks | 否 | 4 | 写Hudi表task并行度。 |
| index.bootstrap.enabled | 否 | false | 是否开启索引加载,开启后会将已存表的最新数据一次性加载到state中。 如果有全量数据接增量的需求,且已经有全量的离线Hoodie表,需要接上实时写入,同时保证数据不重复,可以开启索引加载功能。 | |
| write.index_bootstrap.tasks | 否 | 4 | 如果启动作业时索引加载缓慢,可以调大该值,调大该值后可以加快bootstrap阶段的效率,但bootstrap阶段会阻塞CheckPoint。 | |
| compaction.async.enabled | 否 | true | 是否开启在线压缩。 | |
| compaction.schedule.enabled | 否 | true | 是否阶段性生成压缩plan,即使关闭在线压缩的情况下也建议开启。 | |
| compaction.tasks | 否 | 10 | 压缩Hudi表task并行度。 | |
| index.state.ttl | 否 | 7D | 索引保存的时间,默认为7天(单位:天),小于“0”表示永久保存。 索引是判断数据重复的核心数据结构,对于长时间的更新,比如更新一个月前的数据,需要将该值调大。 | |
| 维表 | lookup.join.cache.ttl | 否 | 60min | Hudi作为维表时数据reload的时间周期。 |
FlinkSQL建Hudi表支持字段注释
通过Hudi Catalog创建的Hudi表,支持同步字段注释信息到Hive。
- 参考Catalog管理创建一个Hudi Catalog,如:hudi_catalog。
- 新建一个Hudi表,例如:hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,SQL示例如下:
CREATE TABLE hudi_catalog.`default`.hudi_table( id int comment '主键', name VARCHAR(20) comment '名字', age INT comment '年龄', `date` VARCHAR(20) ) PARTITIONED BY (`date`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi_mor',--hudi表存储路径 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'id', 'hoodie.bucket.index.num.buckets'='BUCKET桶数量', 'write.precombine.field' = 'age', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '8', 'write.tasks' = '8' );
更多参考
FlinkSQL读写Hudi表参数简化可参考Hudi创建表参数简化指导。