更新时间:2024-11-29 GMT+08:00

FlinkServer对接Hudi

操作场景

本指南通过使用FlinkServer写FlinkSQL对接Hudi。

前提条件

  • 集群已安装HDFS、Yarn、Hive、Spark、Flink和Kafka等服务。
  • 包含Flink、Kafka服务的客户端已安装,例如安装路径为:/opt/client
  • Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。
  • 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。并且用户需要添加hadoop、hive、kafkaadmin用户组,以及Manager_administrator角色。

Flink对Hudi表的读写支持

Flink对Hudi表的COW表、MOR表类型读写支持详情见表1

表1 Flink对Hudi表的读写支持

Flink SQL

COW表

MOR表

批量写

支持

支持

批量读

支持

支持

流式写

支持

支持

流式读

支持

支持

操作步骤

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。然后输入SQL,执行SQL校验通过后,启动作业。如下SQL示例将作为3个作业分别添加,依次运行。

    所有作业均需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    建议开启故障恢复策略,提高作业可靠性。例如“故障恢复策略”选择“fixed-delay”,“重试次数”设置为“3”,“失败重试间隔”设置为“30”,重试次数和间隔可按实际业务需要填写。

    作业启动,状态显示“运行中”后,可通过“更多 > 作业详情”跳转到Flink作业的原生UI页面,查看Job运行情况。

    • 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
    • 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
    • FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
    • 默认Hudi写表是Flink状态索引,如果需要使用bucket索引需要在Hudi写表中添加参数:
      'index.type'='BUCKET',
      'hoodie.bucket.index.num.buckets'='Hudi表中每个分区划分桶的个数',
      'hoodie.bucket.index.hash.field'='recordkey.field'
      • hoodie.bucket.index.num.buckets:Hudi表中每个分区划分桶的个数,每个分区内的数据通过Hash方式放入每个桶内。建表或第一次写入数据时设置后不能修改,否则更新数据会存在异常。
      • hoodie.bucket.index.hash.field:进行分桶时计算Hash值的字段,必须为主键的子集,默认为Hudi表的主键。该参数不填则默认为recordkey.field。
    • 对于同一张Hudi表,可以被Flink、Spark引擎的bucket索引交叉混写。
    1. 作业1:FlinkSQL流式写入MOR表。
      CREATE TABLE stream_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'write.precombine.field' = 'ts',
      'write.tasks' = '4'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      insert into 
      stream_mor 
      select 
      * 
      from 
      kafka;
    2. 作业2:FlinkSQL流式写入COW表
      CREATE TABLE stream_write_cow(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_cow',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'write.precombine.field' = 'ts',
      'write.tasks' = '4'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      insert into 
      stream_write_cow 
      select 
      * 
      from 
      kafka;
    3. 作业3:FlinkSQL流式读取MOR和COW表并合并数据输出到Kafka。注意作业3需要等待作业1和作业2均启动后,状态显示为“运行中”后再执行SQL校验和启动作业(否则SQL校验可能提示错误,找不到Hudi表目录)。
      CREATE TABLE stream_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',  
      'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',  
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'write.precombine.field' = 'ts',
      'read.tasks' = '4',
      'read.streaming.enabled' = 'true',
      'read.streaming.check-interval' = '5',
      'read.streaming.start-commit' = 'earliest'
      );
      CREATE TABLE stream_write_cow(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_cow',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'write.precombine.field' = 'ts',
      'read.tasks' = '4',
      'read.streaming.enabled' = 'true',
      'read.streaming.check-interval' = '5',
      'read.streaming.start-commit' = 'earliest'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'readhudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      insert into 
      kafka 
      select 
      * 
      from 
      stream_mor union all select * from stream_write_cow;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

  3. 参考管理Kafka主题中的消息,向kafka中写入数据。

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为“writehudi”:

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic writehudi --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    {"uuid": "1","name":"a01","age":10,"ts":10,"p":"1"}
    {"uuid": "2","name":"a02","age":20,"ts":20,"p":"2"}

    输入完成后按回车发送消息。

  4. 消费kafka topic的数据,读取Flink流读Hudi表的结果。

    sh kafka-console-consumer.sh --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --consumer.config 客户端目录/Kafka/kafka/config/consumer.properties --from-beginning

    例如本示例使用主题名称为“readhudi”:

    sh kafka-console-consumer.sh --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --topic readhudi --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning

    读取结果如下(顺序不固定):

    {"uuid": "1","name":"a01","age":10,"ts":10,"p":"1"}
    {"uuid": "2","name":"a02","age":20,"ts":20,"p":"2"}
    {"uuid": "1","name":"a01","age":10,"ts":10,"p":"1"}
    {"uuid": "2","name":"a02","age":20,"ts":20,"p":"2"}

WITH主要参数说明

表2 WITH主要参数说明

方式

配置项

是否必选

默认值

描述

读取

read.tasks

4

读Hudi表task并行度

read.streaming.enabled

false

是否开启流读模式

read.streaming.start-commit

默认从最新commit

Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间)

read.end-commit

默认到最新commit

Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的结束消费位置(闭区间)

写入

write.tasks

4

写Hudi表task并行度

index.bootstrap.enabled

false

是否开启索引加载,开启后会将已存表的最新数据一次性加载到state中。

如果有全量数据接增量的需求,且已经有全量的离线Hoodie表,需要接上实时写入,同时保证数据不重复,可以开启索引加载功能。

write.index_bootstrap.tasks

4

如果启动作业时索引加载缓慢,可以调大该值,调大该值后可以加快bootstrap阶段的效率,但bootstrap阶段会阻塞CheckPoint

compaction.async.enabled

true

是否开启在线压缩

compaction.schedule.enabled

true

是否阶段性生成压缩plan,即使关闭在线压缩的情况下也建议开启

compaction.tasks

10

压缩Hudi表task并行度

index.state.ttl

7D

索引保存的时间,默认为7天(单位:天),小于“0”表示永久保存

索引是判断数据重复的核心数据结构,对于长时间的更新,比如更新一个月前的数据,需要将该值调大

Flink On Hudi同步元数据到Hive

启动此特性后,Flink写数据至Hudi表将自动在Hive上创建出Hudi表并同步添加分区,然后供SparkSQL、Hive等服务读取Hudi表数据。

如下是支持的两种同步元数据方式,后续操作步骤以JDBC方式为示例:

  • 使用JDBC方式同步元数据到Hive
    CREATE TABLE stream_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    ) PARTITIONED BY (`p`) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
    'table.type' = 'MERGE_ON_READ',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = '要同步到Hive的表名',
    'hive_sync.db' = '要同步到Hive的数据库名',
    'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值',
    'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值'
    );
    • hive_sync.jdbc_url:Hive客户端component_env文件中CLIENT_HIVE_URI的值,若该值中存在“\”需将其删除。
    • 若需要使用Hive风格分区,需同时配置如下参数:
      • 'hoodie.datasource.write.hive_style_partitioning' = 'true'
      • 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
    • Flink on Hudi并同步数据至Hive的任务,因为Hudi对大小写敏感,Hive对大小写不敏感,所以在Hudi表中的字段不建议使用大写字母,否则可能会造成数据无法正常读写。
  • 使用HMS方式同步元数据到Hive
    CREATE TABLE stream_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    ) PARTITIONED BY (`p`) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
    'table.type' = 'MERGE_ON_READ',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = '要同步到Hive的表名',
    'hive_sync.db' = '要同步到Hive的数据库名',
    'hive_sync.mode' = 'hms',
    'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值',
    'properties.hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值'
    );

JDBC方式示例:

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。然后填入SQL,执行SQL校验通过后,启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    CREATE TABLE stream_mor2(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    ) PARTITIONED BY (`p`) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor2',
    'table.type' = 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field' = 'uuid',
    'write.precombine.field' = 'ts',
    'write.tasks' = '4',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = '要同步到Hive的表名,如stream_mor2',
    'hive_sync.db' = '要同步到Hive的数据库名,如default',
    'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值',
    'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值'
    );
    CREATE TABLE datagen (
    uuid varchar(20), name varchar(10), age int, ts INT, p varchar(20)
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.p.length' = '1'
    );insert into stream_mor2 select * from datagen;

  3. 等待Flink作业运行一段时间,将datagen生成的随机测试数据持续写入Hudi表。可通过“更多 > 作业详情”跳转到Flink作业的原生UI页面,查看Job运行情况。
  4. 登录客户端所在节点,加载环境变量,执行beeline命令登录Hive客户端,执行SQL查看是否在Hive上成功创建Hudi Sink表,并且查询表是否可读出数据。

    cd /opt/hadoopclient

    source bigdata_env

    beeline

    desc formatted default.stream_mor2;

    select * from default.stream_mor2 limit 5;

    show partitions default.stream_mor2;