文档首页/
MapReduce服务 MRS/
组件操作指南(LTS版)(巴黎区域)/
使用Flink/
使用Flink WebUI/
FlinkServer对接外部组件/
FlinkServer对接Hudi
更新时间:2022-12-14 GMT+08:00
FlinkServer对接Hudi
操作场景
本指南通过使用FlinkServer写FlinkSQL对接Hudi。
前提条件
- 集群已安装HDFS、Yarn、Flink和Hudi等服务。
- 包含Hudi服务的客户端已安装,例如安装路径为:/opt/Bigdata/client。
- Flink要求1.12.2及以上版本,Hudi要求0.9.0及以上版本。
- 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI,如:flink_admin。
Flink对Hudi表的读写支持
Flink对Hudi表的COW表、MOR表类型读写支持详情见表1。
目前FlinkSQL对接Hudi仅支持Snapshot mode和Read Optimized mode两种模式。
操作步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。并启动作业。
需勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
- 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
- FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
- FlinkSQL流式写入MOR表,仅支持kafka json格式。
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_mor select * from kafka;
- FlinkSQL流式写入COW表
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_write_cow select * from kafka;
- FlinkSQL读取MOR表
CREATE TABLE hudi_read_spark_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/default/tb_hudimor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts timestamp(6)INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into hudi_read_spark_mor select * from kafka;
- FlinkSQL写入Hudi表数据后,通过Spark、Hive读该数据时,需要使用run_hive_sync_tool.sh将Hudi表数据同步到Hive中。同步方法请参考将Hudi表数据同步到Hive。
同步前需要保证不再新增分区,同步后新增的分区将不能被读取。
父主题: FlinkServer对接外部组件