更新时间:2024-10-28 GMT+08:00

创建FlinkServer作业写入数据至Hive表

本章节适用于MRS 3.1.2及之后的版本。

操作场景

目前FlinkServer对接Hive使用对接metaStore的方式,所以需要Hive开启MetaStore功能。Hive可以作为sink和维表。

本示例以安全模式Kafka为例。

前提条件

  • 集群已安装HDFS、Yarn、Kafka、Flink和Hive(且服务名称必须为Hive)等服务。
  • 包含Hive服务的客户端已安装,安装路径如:/opt/client
  • Flink支持1.12.2及以后版本,Hive支持3.1.0及以后版本。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
  • 参考创建FlinkServer集群连接步骤中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。

创建作业步骤

场景一:Hive作为Sink表。

以映射表类型为Kafka对接Hive流程为例。

  1. 使用flink_admin访问Flink WebUI,请参考访问FlinkServer WebUI界面
  2. 新建集群连接,如:flink_hive。

    1. 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。
    2. 单击“创建集群连接”,在弹出的页面中参考表1填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。
      表1 创建集群连接信息

      参数名称

      参数描述

      取值样例

      集群连接名称

      集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。

      flink_hive

      描述

      集群连接名称描述信息。

      -

      版本

      选择集群版本。

      MRS 3

      是否安全版本

      • 是,安全集群选择是。需要输入访问用户名和上传用户凭证;
      • 否,非安全集群选择否。

      访问用户名

      访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。

      “是否安全版本”选择“是”时存在此参数。

      flink_admin

      客户端配置文件

      集群客户端配置文件,格式为tar。

      -

      用户凭据

      FusionInsight Manager中用户的认证凭据,格式为tar。

      “是否安全版本”选择“是”时存在此参数。

      输入访问用户名后才可上传文件。

      flink_admin的用户凭据

  3. 新建Flink SQL流作业,如:flinktest1。

    1. 单击“作业管理”进入作业管理页面。
    2. 单击“新建作业”,在新建作业页面参考表2填写信息,单击“确定”,创建作业成功并进入作业开发界面。
      表2 新建作业信息

      参数名称

      参数描述

      取值样例

      类型

      作业类型,包括Flink SQL和Flink Jar。

      Flink SQL

      名称

      作业名称,只能包含英文字母、数字和下划线,且不能多于64个字符。

      flinktest1

      作业类型

      作业数据来源类型,包括流作业和批作业。

      流作业

      描述

      作业描述,不能超过100个字符。

      -

  4. 在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。

    CREATE TABLE test_kafka (
      user_id varchar,
      item_id varchar,
      cat_id varchar,
      zw_test timestamp
    ) WITH (
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'format' = 'json',
      'topic' = 'zw_tset_kafka',
      'connector' = 'kafka',
      'scan.startup.mode' = 'latest-offset',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
    
    );
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-version' = '3.1.0',
      'default-database' = 'default',
      'cluster.name' = 'flink_hive'
    );
    use catalog myhive;
    set table.sql-dialect = hive;create table user_behavior_hive_tbl (
        user_id STRING,
        item_id STRING,
        cat_id STRING,
        ts timestamp
      ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as textfile TBLPROPERTIES (
        'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00',
        'sink.partition-commit.trigger' = 'process-time',
        'sink.partition-commit.delay' = '0S',
        'sink.partition-commit.policy.kind' = 'metastore,success-file'
      );
    INSERT into
      user_behavior_hive_tbl
    SELECT
      user_id,
      item_id,
      cat_id,
      zw_test,
      DATE_FORMAT(zw_test, 'yyyy-MM-dd'),
      DATE_FORMAT(zw_test, 'HH'),
      DATE_FORMAT(zw_test, 'mm')
    FROM
      default_catalog.default_database.test_kafka;
    • Kafka端口号:
      • 集群的“认证模式”为“安全模式”时为“sasl.port”的值,默认为“21007”。
      • 集群的“认证模式”为“普通模式”时为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 'cluster.name' = 'flink_hive'的值为2新建的集群连接名称。
    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

  5. 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
  6. 单击左上角“提交”提交作业。
  7. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
  8. 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。

    ./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为zw_tset_kafka:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic zw_tset_kafka --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    {"user_id": "3","item_id":"333333","cat_id":"cat333","zw_test":"2021-09-08 09:08:01"}
    {"user_id": "4","item_id":"444444","cat_id":"cat444","zw_test":"2021-09-08 09:08:01"} 

    输入完成后按回车发送消息。

  9. 执行以下命令查看Sink表中是否接收到数据,即Hive表是否正常写入数据。

    beeline

    select * from user_behavior_hive_tbl;

场景二:Hive作为维表。

  1. 参考Hive客户端使用实践进入Hive客户端,创建Hive表并插入数据 。

    CREATE TABLE hive3 ( id  int, name string ) PARTITIONED BY (dy STRING,ho STRING,mi STRING) 
    STORED AS textfile TBLPROPERTIES (
    'partition.time-extractor.timestamp-pattern'='$dy $ho:$mi:00',
    'streaming-source.partition.include'='all',
    'streaming-source.enable'='true',
    'streaming-source.partition.include'='all',
    'streaming-source.monitor-interval'='2m');
    
    insert into table hive3  values('1','aname','sss','name1','company1');
    insert into table hive3  values('2','bname','sss','name1','company1');

  2. 使用flink_admin访问Flink WebUI,请参考访问FlinkServer WebUI界面
  3. 新建集群连接,如:flink_hive1。

    1. 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。
    2. 单击“创建集群连接”,在弹出的页面中参考表3填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。
      表3 创建集群连接信息

      参数名称

      参数描述

      取值样例

      集群连接名称

      集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。

      flink_hive1

      描述

      集群连接名称描述信息。

      -

      版本

      选择集群版本。

      MRS 3

      是否安全版本

      • 是,安全集群选择是。需要输入访问用户名和上传用户凭证;
      • 否,非安全集群选择否。

      访问用户名

      访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。

      “是否安全版本”选择“是”时存在此参数。

      flink_admin

      客户端配置文件

      集群客户端配置文件,格式为tar。

      -

      用户凭据

      FusionInsight Manager中用户的认证凭据,格式为tar。

      “是否安全版本”选择“是”时存在此参数。

      输入访问用户名后才可上传文件。

      flink_admin的用户凭据

  4. 新建Flink SQL流作业,如:flinktest2。

    1. 单击“作业管理”进入作业管理页面。
    2. 单击“新建作业”,在新建作业页面参考表4填写信息,单击“确定”,创建作业成功并进入作业开发界面。
      表4 新建作业信息

      参数名称

      参数描述

      取值样例

      类型

      作业类型,包括Flink SQL和Flink Jar。

      Flink SQL

      名称

      作业名称,只能包含英文字母、数字和下划线,且不能多于64个字符。

      flinktest2

      作业类型

      作业数据来源类型,包括流作业和批作业。

      流作业

      描述

      作业描述,不能超过100个字符。

      -

  5. 在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。

    CREATE TABLE kafka_source (
      id int, 
      address string, 
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_source',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
    );
    CREATE TABLE kafka_sink(
      id int,
      address string,
      name string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_sink',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
    );
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-version' = '3.1.0',
      'default-database' = 'default',
      'cluster.name' = 'flink_hive1'
    );
    use catalog myhive;
    set
      table.sql-dialect = hive;
    create table if not exists hive3 (id int, name string) stored as textfile TBLPROPERTIES (
      'streaming-source.enable' = 'false',
      'streaming-source.partition.include' = 'all',
      'lookup.join.cache.ttl' = '5 min'
    );
    INSERT INTO
      default_catalog.default_database.kafka_sink
    select
      t1.id,
      t1.address,
      t2.name
    from
      default_catalog.default_database.kafka_source as t1
      join hive3 FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.id = t2.id;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 'cluster.name' = 'flink_hive'的值为3新建的集群连接名称。
    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

  6. 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
  7. 单击左上角“提交”提交作业。
  8. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
  9. 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。

    ./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    输入消息内容:
    1,city1
    2,city2

    输入完成后按回车发送消息。

  10. 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即9执行完成后查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic test_sink --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

    结果如下:

    1,city1,aname
    2,city2,bname