更新时间:2022-12-14 GMT+08:00

FlinkServer对接Hudi

操作场景

本指南通过使用FlinkServer写FlinkSQL对接Hudi。

前提条件

  • 集群已安装HDFS、Yarn、Flink和Hudi等服务。
  • 包含Hudi服务的客户端已安装,例如安装路径为:/opt/Bigdata/client。
  • Flink要求1.12.2及以上版本,Hudi要求0.9.0及以上版本。
  • 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI,如:flink_admin。

Flink对Hudi表的读写支持

Flink对Hudi表的COW表、MOR表类型读写支持详情见表1

表1 Flink对Hudi表的读写支持

Flink SQL

COW表

MOR表

批量写

支持

支持

批量读

支持

支持

流式写

支持

支持

流式读

支持

支持

目前FlinkSQL对接Hudi仅支持Snapshot mode和Read Optimized mode两种模式。

操作步骤

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。并启动作业。

    需勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    • 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
    • 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
    • FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
    • FlinkSQL流式写入MOR表,仅支持kafka json格式。
      CREATE TABLE stream_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
      'table.type' = 'MERGE_ON_READ'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
      );
      
      insert into
      stream_mor
      select
      *
      from
      kafka;
    • FlinkSQL流式写入COW表
      CREATE TABLE stream_write_cow(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_cow'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
      );
      
      insert into
      stream_write_cow
      select
      *
      from
      kafka;
    • FlinkSQL读取MOR表
      CREATE TABLE hudi_read_spark_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',  
      'path' = 'hdfs://hacluster/tmp/default/tb_hudimor',  
      'table.type' = 'MERGE_ON_READ'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts timestamp(6)INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
      );
      
      insert into
      hudi_read_spark_mor
      select
      *
      from
      kafka;

    Kafka端口号:

    • 安全模式为“sasl.port”的值,默认为“21007”。
    • 非安全模式为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

      登录FusionInsigh Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

  3. FlinkSQL写入Hudi表数据后,通过Spark、Hive读该数据时,需要使用run_hive_sync_tool.sh将Hudi表数据同步到Hive中。同步方法请参考将Hudi表数据同步到Hive

    同步前需要保证不再新增分区,同步后新增的分区将不能被读取。