更新时间:2022-12-14 GMT+08:00

FlinkServer对接HDFS

操作场景

本章节介绍HDFS作为sink表的DDL定义,以及创建sink表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。

本示例以安全模式Kafka为例。

前提条件

  • 集群中已安装HDFS、Yarn、Flink服务。
  • 包含HDFS服务的客户端已安装,安装路径如:/opt/Bigdata/client。
  • 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI,如:flink_admin。

操作步骤

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
    CREATE TABLE kafka_table (
      user_id STRING,
      order_amount DOUBLE,
      log_ts TIMESTAMP(3),
      WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_source',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv',
       --跳过解析失败的csv数据
      'csv.ignore-parse-errors' = 'true' ,--如果是json数据格式,设置'json.ignore-parse-errors' = 'true'
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    
    );
    
    CREATE TABLE fs_table (
      user_id STRING,
      order_amount DOUBLE,
      dt STRING,
      `hour` STRING
    ) PARTITIONED BY (dt, `hour`) WITH ( --根据日期进行文件分区
      'connector'='filesystem',
      'path'='hdfs:///sql/parquet',
      'format'='parquet',
      'sink.partition-commit.delay'='1 h',
      'sink.partition-commit.policy.kind'='success-file'
    );
    -- streaming sql, insert into file system table
    INSERT INTO fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

    Kafka端口号:

    • 安全模式为“sasl.port”的值,默认为“21007”。
    • 非安全模式为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

      登录FusionInsigh Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

  3. 查看作业管理界面,作业状态为“运行中”。
  4. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据。

    ./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为user_source:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic user_source --producer.config /opt/Bigdata/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    3,3333,"2021-09-10 14:00"
    4,4444,"2021-09-10 14:01"

    输入完成后按回车发送消息。

    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。默认为24002。

  5. 执行以下命令查看Sink表中是否接收到数据,即HDFS目录是否正常写入文件。

    hdfs dfs -ls -R /sql/parquet

Flink对接HDFS分区

  • Flink对接HDFS支持自定义分区。

    Flink文件系统分区支持使用标准的Hive格式。不需要将分区预先注册到表目录中,分区是根据目录结构推断。

    例如,根据下面的目录分区的表将被推断为包含日期时间和小时分区。
    path
    └── datetime=2021-09-03
        └── hour=11
            ├── part-0.parquet
            ├── part-1.parquet
        └── hour=12
            ├── part-0.parquet
    └── datetime=2021-09-24
        └── hour=6
            ├── part-0.parquet
  • 分区文件的滚动策略。

    分区目录中的数据被拆分为part文件,每个分区将至少包含一个part文件,用于接收sink的子任务的数据写入。

    如下参数介绍分区文件如何进行滚动。

    配置项

    默认值

    类型

    描述

    sink.rolling-policy.file-size

    128MB

    MemorySize

    分区文件达到该阈值后,进行滚动。

    sink.rolling-policy.rollover-interval

    30min

    Duration

    分区文件在滚动前可以保持打开的最长持续时间。

    sink.rolling-policy.check-interval

    1min

    Duration

    检查基于时间的滚动策略的时间间隔。

  • 分区目录的文件合并。
    支持文件压缩,允许应用程序具有更小的检查点间隔,而无需生成大量文件。

    仅压缩单个检查点中的文件,即生成的文件数量至少与检查点数量相同。合并前的文件是不可见的,因此文件的可见性是:检查点间隔+压缩时间之后。如果压缩时间太长,将延长检查点的时间段。

    配置项

    默认值

    类型

    描述

    auto-compaction

    false

    Boolean

    是否启用自动压缩。数据将写入临时文件。检查点完成后,检查点生成的临时文件将被压缩。压缩前临时文件不可见。

    compaction.file-size

    none

    MemorySize

    压缩目标文件大小,默认值为滚动文件大小。

  • 分区文件的提交。
    文件写入分区后,通常需要通知下游应用程序。如将分区添加到Hive元存储中,或在目录中写入_SUCCESS文件。分区文件的提交操作基于触发器和策略的组合方式。
    • 分区文件提交触发器相关配置

      配置项

      默认值

      类型

      描述

      sink.partition-commit.trigger

      process-time

      String

      • process-time:基于计算节点的系统时间,它既不需要分区时间提取,也不需要生成watermark。即“当前系统时间”超过“分区创建时的系统时间”加上“延迟”时间,就提交分区。
      • partition-time:基于从分区提取的时间,它需要生成watermark。即“watermark时间”超过“从分区提取的时间”加上“延迟”时间,就提交分区。

      sink.partition-commit.delay

      0 s

      Duration

      分区在延迟时间之前不会提交。如果是每日分区,则应为“1 d”,如果是每小时分区,则应为“1 h”。

    • 分区问文件提交策略相关配置

      配置项

      默认值

      类型

      描述

      sink.partition-commit.policy.kind

      -

      String

      提交分区的策略。

      • metastore:将分区添加到元存储。只有hive表支持元存储策略,文件系统通过目录结构管理分区。
      • success-file:将success-file文件添加到目录中。
      • 两者可以同时配置,即:'sink.partition-commit.policy.kind'='metastore,success-file'。

      sink.partition-commit.policy.class

      -

      String

      用于实现分区提交策略接口的分区提交策略类。

      仅在自定义提交策略中生效。

      sink.partition-commit.success-file.name

      _SUCCESS

      String

      success-file分区提交策略的文件名,默认值为_SUCCESS。