更新时间:2026-06-11 GMT+08:00
分享

FlinkServer对接Hudi

操作场景

本指南通过使用FlinkServer写FlinkSQL对接Hudi。FlinkSQL读写Hudi时,不支持定义TINYINT、SMALLINT和TIME类型。

前提条件

  • 集群已安装HDFS、Yarn、Hive、Spark、Flink和Kafka等服务。
  • 包含Flink、Kafka服务的客户端已安装,例如安装路径为:/opt/client
  • Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。并且用户需要添加hadoop、hive、kafkaadmin用户组,以及Manager_administrator角色。
  • 仅支持Flink与Hive组件共集群。

约束与限制

本章节适用于MRS 3.6.0-LTS.1及之后的版本。

MRS 3.6.0-LTS.1版本,MOR表已默认是BUCKET索引,建表时必须指定BUCKET桶数,桶数请参考《MapReduce服务(MRS)组件设计开发规范》的“确定表索引”章节的“确认BUCKET索引桶数(BUCKET索引表必须参考此内容预估桶数)”来计算。

操作步骤

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考Catalog管理创建一个Hudi Catalog,如:hudi_catalog。
  3. 新建Hudi表,具体操作如下:

    • 新建一个Hudi mor表,表名为hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,完整示例如下:
      CREATE TABLE  hudi_catalog.`default`.hudi_table(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'write.precombine.field' = 'ts',
      'index.type' = 'BUCKET',
      'hoodie.bucket.index.num.buckets' = '4',
      'write.tasks' = '4'
      );
    • 新建一个Hudi cow表,表名为hudi_catalog.`default`.hudi_cow_table,具体操作可参考数据管理,完整示例如下:
      CREATE TABLE  hudi_catalog.`default`.hudi_cow_table(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_cow',
      'hoodie.datasource.write.recordkey.field' = 'uuid',
      'write.precombine.field' = 'ts',
      'write.tasks' = '4'
      );
    • FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html。
    • MRS 3.6.0-LTS.1版本Flink使用catalog写hudi mor表需要指定'index.type',否则运行时会报错。

  4. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。然后输入SQL,执行SQL校验通过后,启动作业。如下SQL示例将作为3个作业分别添加,依次运行。

    所有作业均需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    建议开启故障恢复策略,提高作业可靠性。例如“故障恢复策略”选择“fixed-delay”,“重试次数”设置为“3”,“失败重试间隔”设置为“30”,重试次数和间隔可按实际业务需要填写。

    作业启动,状态显示“运行中”后,可通过“更多 > 作业详情”跳转到Flink作业的原生UI页面,查看Job运行情况。

    • 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
    • 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常,建议CheckPoint间隔为分钟级。
    1. 作业1:FlinkSQL流式写入MOR表
      CREATE TABLE datagen_source(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
      );
      
      insert into 
      hudi_catalog.`default`.hudi_table 
      select 
      * 
      from 
      datagen_source;
    2. 作业2:FlinkSQL流式写入COW表
      CREATE TABLE datagen_source(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
      );
      
      insert into 
      hudi_catalog.`default`.hudi_cow_table 
      select 
      * 
      from 
      datagen_source;
    3. 作业3:FlinkSQL流式读取MOR和COW表并合并数据输出到Kafka。注意作业3需要等待作业1和作业2均启动后,状态显示为“运行中”后再执行SQL校验和启动作业(否则SQL校验可能提示错误,找不到Hudi表目录)。
      CREATE TABLE print_sink(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'print'
      );
      
      insert into 
      print_sink
      select 
      * 
      from 
      hudi_catalog.`default`.hudi_table union all select * from hudi_catalog.`default`.hudi_cow_table;

  5. 单击左上角“提交”提交作业。
  6. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
  7. 进入作业详情界面,单击“Task Managers > Stdout”,查看是否正常打印表中数据。

FlinkSQL Lookup Join Hudi使用须知

适用于MRS 3.5.0及以后版本。

  • 使用lookup.join.cache.ttl参数来控制维表数据的加载周期,默认值为60min。
  • Hudi维表数据会被加载到Flink TaskManager Heap中,所以不推荐大于10万行记录的Hudi表作为维表。
  • 维表的新增、更新数据需要等到下一次加载周期后,才能被加载进来参与计算。
  • 新建一个Hudi表,例如:hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,完整示例如下:
    CREATE TABLE  hudi_catalog.`default`.hudi_table(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    ) PARTITIONED BY (`p`) WITH (
     'connector' = 'hudi',   
     'path' = 'hdfs://hacluster/tmp/hudimor',   
     'table.type' = 'MERGE_ON_READ',   
     'hoodie.datasource.write.recordkey.field' = 'uuid',   
     'index.type' = 'BUCKET',
     'hoodie.bucket.index.num.buckets' = '4',
     'write.precombine.field' = 'ts',   
     'lookup.join.cache.ttl' = '60min'
    );
SQL示例如下:
CREATE TABLE datagen(uuid varchar(20), proctime as PROCTIME()) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE blackhole (
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts INT,
  `p` VARCHAR(20)
) WITH ('connector' = 'blackhole');
insert into
  blackhole
select
  t1.uuid as uuid,
  t2.name as name,
  t2.age as age,
  t2.ts as ts,
  t2.p as p
FROM
  datagen AS t1
  left JOIN hudi_catalog.`default`.hudi_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid;

WITH主要参数说明

表1 WITH主要参数说明

方式

配置项

是否必选

默认值

描述

读取

read.tasks

4

读Hudi表task并行度。

read.streaming.enabled

false

是否开启流读模式。

read.streaming.start-commit

默认从最新commit

Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间)。

read.end-commit

默认到最新commit

Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的结束消费位置(闭区间)。

写入

write.tasks

4

写Hudi表task并行度。

index.bootstrap.enabled

false

是否开启索引加载,开启后会将已存表的最新数据一次性加载到state中。

如果有全量数据接增量的需求,且已经有全量的离线Hoodie表,需要接上实时写入,同时保证数据不重复,可以开启索引加载功能。

write.index_bootstrap.tasks

4

如果启动作业时索引加载缓慢,可以调大该值,调大该值后可以加快bootstrap阶段的效率,但bootstrap阶段会阻塞CheckPoint。

compaction.async.enabled

true

是否开启在线压缩。

compaction.schedule.enabled

true

是否阶段性生成压缩plan,即使关闭在线压缩的情况下也建议开启。

compaction.tasks

10

压缩Hudi表task并行度。

index.state.ttl

7D

索引保存的时间,默认为7天(单位:天),小于“0”表示永久保存。

索引是判断数据重复的核心数据结构,对于长时间的更新,比如更新一个月前的数据,需要将该值调大。

维表

lookup.join.cache.ttl

60min

Hudi作为维表时数据reload的时间周期。

FlinkSQL建Hudi表支持字段注释

通过Hudi Catalog创建的Hudi表,支持同步字段注释信息到Hive。

  1. 参考Catalog管理创建一个Hudi Catalog,如:hudi_catalog。
  2. 新建一个Hudi表,例如:hudi_catalog.`default`.hudi_table,具体操作可参考数据管理,SQL示例如下:

    CREATE TABLE hudi_catalog.`default`.hudi_table(
           id int comment '主键',
           name VARCHAR(20) comment '名字',
           age INT comment '年龄',
           `date` VARCHAR(20)
    ) PARTITIONED BY (`date`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi_mor',--hudi表存储路径
      'table.type' = 'MERGE_ON_READ',
      'hoodie.datasource.write.recordkey.field' = 'id',
      'hoodie.bucket.index.num.buckets'='BUCKET桶数量',
      'write.precombine.field' = 'age',
      'hoodie.datasource.write.hive_style_partitioning' = 'true',
      'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
      'index.type' =  'BUCKET',
      'hoodie.bucket.index.num.buckets' = '8',
      'write.tasks' = '8'
    );

更多参考

FlinkSQL读写Hudi表参数简化可参考Hudi创建表参数简化指导

相关文档