更新时间:2024-04-09 GMT+08:00
分享

FlinkServer对接Hudi

本章节适用于MRS 3.1.2及之后的版本。

操作场景

本指南通过使用FlinkServer写FlinkSQL对接Hudi。

前提条件

  • 集群已安装HDFS、Yarn、Flink和Hudi等服务。
  • 包含Hudi服务的客户端已安装,例如安装路径为:/opt/client
  • Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。
  • 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。

Flink对Hudi表的读写支持

Flink对Hudi表的COW表、MOR表类型读写支持详情见表1

表1 Flink对Hudi表的读写支持

Flink SQL

COW表

MOR表

批量写

支持

支持

批量读

支持

支持

流式写

支持

支持

流式读

支持

支持

操作步骤

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。并启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    • 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
    • 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
    • FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
    • FlinkSQL流式写入MOR表。
      CREATE TABLE stream_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
      'table.type' = 'MERGE_ON_READ'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      insert into
      stream_mor
      select
      *
      from
      kafka;
    • FlinkSQL流式写入COW表
      CREATE TABLE stream_write_cow(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hacluster/tmp/hudi/stream_cow'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      insert into
      stream_write_cow
      select
      *
      from
      kafka;
    • FlinkSQL读取MOR表
      CREATE TABLE hudi_read_spark_mor(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT,
      `p` VARCHAR(20)
      ) PARTITIONED BY (`p`) WITH (
      'connector' = 'hudi',  
      'path' = 'hdfs://hacluster/tmp/default/tb_hudimor',  
      'table.type' = 'MERGE_ON_READ'
      );
      
      CREATE TABLE kafka(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts timestamp(6)INT,
      `p` VARCHAR(20)
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'writehudi',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      insert into
      kafka
      select
      *
      from
      hudi_read_spark_mor;

    Kafka端口号:

    • 集群的“认证模式”为“安全模式”时为“sasl.port”的值,默认为“21007”。
    • 集群的“认证模式”为“普通模式”时为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

      登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

  3. FlinkSQL写入Hudi表数据后,通过Spark、Hive读该数据时,需要使用run_hive_sync_tool.sh将Hudi表数据同步到Hive中。同步方法请参考将Hudi表数据同步到Hive

    同步前需要保证不再新增分区,同步后新增的分区将不能被读取。

Flink On Hudi同步元数据到Hive

适用于MRS 3.2.0及之后版本。
  • 使用JDBC方式同步元数据到Hive
    CREATE TABLE stream_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    ) PARTITIONED BY (`p`) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
    'table.type' = 'MERGE_ON_READ',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = '要同步到Hive的表名',
    'hive_sync.db' = '要同步到Hive的数据库名',
    'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值',
    'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值'
    );
    • hive_sync.jdbc_url:Hive客户端component_env文件中CLIENT_HIVE_URI的值,如果该值中存在“\”需将其删除。
    • 如果需要使用Hive风格分区,需同时配置如下参数:
      • 'hoodie.datasource.write.hive_style_partitioning' = 'true'
      • 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
    • Flink on Hudi并同步数据至Hive的任务,因为Hudi对大小写敏感,Hive对大小写不敏感,所以在Hudi表中的字段不建议使用大写字母,否则可能会造成数据无法正常读写。
  • 使用HMS方式同步元数据到Hive
    CREATE TABLE stream_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts INT,
    `p` VARCHAR(20)
    ) PARTITIONED BY (`p`) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
    'table.type' = 'MERGE_ON_READ',
    'hive_sync.enable' = 'true',
    'hive_sync.table' = '要同步到Hive的表名',
    'hive_sync.db' = '要同步到Hive的数据库名',
    'hive_sync.mode' = 'hms',
    'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值',
    'properties.hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值'
    );
分享:

    相关文档

    相关产品