FlinkServer对接Hudi
操作场景
本指南通过使用FlinkServer写FlinkSQL对接Hudi。
前提条件
- 集群已安装HDFS、Yarn、Hive、Spark、Flink和Kafka等服务。
- 包含Flink、Kafka服务的客户端已安装,例如安装路径为:/opt/client。
- Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。
- 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。并且用户需要添加hadoop、hive、kafkaadmin用户组,以及Manager_administrator角色。
Flink对Hudi表的读写支持
Flink对Hudi表的COW表、MOR表类型读写支持详情见表1。
操作步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。然后输入SQL,执行SQL校验通过后,启动作业。如下SQL示例将作为3个作业分别添加,依次运行。
所有作业均需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
建议开启故障恢复策略,提高作业可靠性。例如“故障恢复策略”选择“fixed-delay”,“重试次数”设置为“3”,“失败重试间隔”设置为“30”,重试次数和间隔可按实际业务需要填写。
作业启动,状态显示“运行中”后,可通过“更多 > 作业详情”跳转到Flink作业的原生UI页面,查看Job运行情况。
- 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
- 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
- FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
- 默认Hudi写表是Flink状态索引,如果需要使用bucket索引需要在Hudi写表中添加参数:
'index.type'='BUCKET', 'hoodie.bucket.index.num.buckets'='Hudi表中每个分区划分桶的个数', 'hoodie.bucket.index.hash.field'='recordkey.field'
- hoodie.bucket.index.num.buckets:Hudi表中每个分区划分桶的个数,每个分区内的数据通过Hash方式放入每个桶内。建表或第一次写入数据时设置后不能修改,否则更新数据会存在异常。
- hoodie.bucket.index.hash.field:进行分桶时计算Hash值的字段,必须为主键的子集,默认为Hudi表的主键。该参数不填则默认为recordkey.field。
- 对于同一张Hudi表,可以被Flink、Spark引擎的bucket索引交叉混写。
- 作业1:FlinkSQL流式写入MOR表。
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'write.tasks' = '4' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); insert into stream_mor select * from kafka;
- 作业2:FlinkSQL流式写入COW表
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'write.tasks' = '4' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); insert into stream_write_cow select * from kafka;
- 作业3:FlinkSQL流式读取MOR和COW表并合并数据输出到Kafka。注意作业3需要等待作业1和作业2均启动后,状态显示为“运行中”后再执行SQL校验和启动作业(否则SQL校验可能提示错误,找不到Hudi表目录)。
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'read.tasks' = '4', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '5', 'read.streaming.start-commit' = 'earliest' ); CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'read.tasks' = '4', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '5', 'read.streaming.start-commit' = 'earliest' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'readhudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); insert into kafka select * from stream_mor union all select * from stream_write_cow;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- 参考管理Kafka主题中的消息,向kafka中写入数据。
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为“writehudi”:
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic writehudi --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:{"uuid": "1","name":"a01","age":10,"ts":10,"p":"1"} {"uuid": "2","name":"a02","age":20,"ts":20,"p":"2"}
输入完成后按回车发送消息。
- 消费kafka topic的数据,读取Flink流读Hudi表的结果。
sh kafka-console-consumer.sh --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --consumer.config 客户端目录/Kafka/kafka/config/consumer.properties --from-beginning
例如本示例使用主题名称为“readhudi”:
sh kafka-console-consumer.sh --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --topic readhudi --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning
读取结果如下(顺序不固定):
{"uuid": "1","name":"a01","age":10,"ts":10,"p":"1"} {"uuid": "2","name":"a02","age":20,"ts":20,"p":"2"} {"uuid": "1","name":"a01","age":10,"ts":10,"p":"1"} {"uuid": "2","name":"a02","age":20,"ts":20,"p":"2"}
WITH主要参数说明
方式 |
配置项 |
是否必选 |
默认值 |
描述 |
---|---|---|---|---|
读取 |
read.tasks |
否 |
4 |
读Hudi表task并行度 |
read.streaming.enabled |
否 |
false |
是否开启流读模式 |
|
read.streaming.start-commit |
否 |
默认从最新commit |
Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间) |
|
read.end-commit |
否 |
默认到最新commit |
Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的结束消费位置(闭区间) |
|
写入 |
write.tasks |
否 |
4 |
写Hudi表task并行度 |
index.bootstrap.enabled |
否 |
false |
是否开启索引加载,开启后会将已存表的最新数据一次性加载到state中。 如果有全量数据接增量的需求,且已经有全量的离线Hoodie表,需要接上实时写入,同时保证数据不重复,可以开启索引加载功能。 |
|
write.index_bootstrap.tasks |
否 |
4 |
如果启动作业时索引加载缓慢,可以调大该值,调大该值后可以加快bootstrap阶段的效率,但bootstrap阶段会阻塞CheckPoint |
|
compaction.async.enabled |
否 |
true |
是否开启在线压缩 |
|
compaction.schedule.enabled |
否 |
true |
是否阶段性生成压缩plan,即使关闭在线压缩的情况下也建议开启 |
|
compaction.tasks |
否 |
10 |
压缩Hudi表task并行度 |
|
index.state.ttl |
否 |
7D |
索引保存的时间,默认为7天(单位:天),小于“0”表示永久保存 索引是判断数据重复的核心数据结构,对于长时间的更新,比如更新一个月前的数据,需要将该值调大 |
Flink On Hudi同步元数据到Hive
启动此特性后,Flink写数据至Hudi表将自动在Hive上创建出Hudi表并同步添加分区,然后供SparkSQL、Hive等服务读取Hudi表数据。
如下是支持的两种同步元数据方式,后续操作步骤以JDBC方式为示例:
- 使用JDBC方式同步元数据到Hive
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值' );
- hive_sync.jdbc_url:Hive客户端component_env文件中CLIENT_HIVE_URI的值,若该值中存在“\”需将其删除。
- 若需要使用Hive风格分区,需同时配置如下参数:
- 'hoodie.datasource.write.hive_style_partitioning' = 'true'
- 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
- Flink on Hudi并同步数据至Hive的任务,因为Hudi对大小写敏感,Hive对大小写不敏感,所以在Hudi表中的字段不建议使用大写字母,否则可能会造成数据无法正常读写。
- 使用HMS方式同步元数据到Hive
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'properties.hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' );
JDBC方式示例:
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。然后填入SQL,执行SQL校验通过后,启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE stream_mor2( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor2', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'write.tasks' = '4', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名,如stream_mor2', 'hive_sync.db' = '要同步到Hive的数据库名,如default', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值' ); CREATE TABLE datagen ( uuid varchar(20), name varchar(10), age int, ts INT, p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.p.length' = '1' );insert into stream_mor2 select * from datagen;
- 等待Flink作业运行一段时间,将datagen生成的随机测试数据持续写入Hudi表。可通过“更多 > 作业详情”跳转到Flink作业的原生UI页面,查看Job运行情况。
- 登录客户端所在节点,加载环境变量,执行beeline命令登录Hive客户端,执行SQL查看是否在Hive上成功创建Hudi Sink表,并且查询表是否可读出数据。
cd /opt/hadoopclient
source bigdata_env
beeline
desc formatted default.stream_mor2;
select * from default.stream_mor2 limit 5;
show partitions default.stream_mor2;