更新时间:2024-05-28 GMT+08:00
FlinkServer对接Hudi
本章节适用于MRS 3.1.2及之后的版本。
操作场景
本指南通过使用FlinkServer写FlinkSQL对接Hudi。
前提条件
- 集群已安装HDFS、Yarn、Flink和Hudi等服务。
- 包含Hudi服务的客户端已安装,例如安装路径为:/opt/client。
- Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。
- 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
Flink对Hudi表的读写支持
Flink对Hudi表的COW表、MOR表类型读写支持详情见表1。
操作步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。并启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
- 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
- FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
- FlinkSQL流式写入MOR表。
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); insert into stream_mor select * from kafka;
- FlinkSQL流式写入COW表
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); insert into stream_write_cow select * from kafka;
- FlinkSQL读取MOR表
CREATE TABLE hudi_read_spark_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/default/tb_hudimor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts timestamp(6)INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); insert into kafka select * from hudi_read_spark_mor;
- FlinkSQL写入Hudi表数据后,通过Spark、Hive读该数据时,需要使用run_hive_sync_tool.sh将Hudi表数据同步到Hive中。同步方法请参考将Hudi表数据同步到Hive。
同步前需要保证不再新增分区,同步后新增的分区将不能被读取。
Flink On Hudi同步元数据到Hive
适用于MRS 3.2.0及之后版本。
- 使用JDBC方式同步元数据到Hive
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值' );
- hive_sync.jdbc_url:Hive客户端component_env文件中CLIENT_HIVE_URI的值,如果该值中存在“\”需将其删除。
- 如果需要使用Hive风格分区,需同时配置如下参数:
- 'hoodie.datasource.write.hive_style_partitioning' = 'true'
- 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
- Flink on Hudi并同步数据至Hive的任务,因为Hudi对大小写敏感,Hive对大小写不敏感,所以在Hudi表中的字段不建议使用大写字母,否则可能会造成数据无法正常读写。
- 使用HMS方式同步元数据到Hive
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'properties.hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' );
父主题: 配置FlinkServer对接其他组件