更新时间:2022-12-14 GMT+08:00

FlinkServer对接Hive

操作场景

目前FlinkServer对接Hive使用对接metaStore的方式,所以需要Hive开启MetaStore功能。Hive可以作为source,sink和维表。

本示例以安全模式Kafka为例。

前提条件

  • 集群已安装HDFS、Yarn、Kafka、Flink和Hive等服务。
  • 包含Hive服务的客户端已安装,安装路径如:/opt/Bigdata/client。
  • Flink支持1.12.2及以上版本,Hive支持3.1.0及以上版本。
  • 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI,如:flink_admin。
  • 参考创建集群连接中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。

操作步骤

以映射表类型为Kafka对接Hive流程为例。

  1. 使用flink_admin访问Flink WebUI,请参考访问Flink WebUI
  2. 新建集群连接,如:flink_hive。

    1. 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。
    2. 单击“创集集群连接”,在弹出的页面中参考表1填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。
      表1 创建集群连接信息

      参数名称

      参数描述

      取值样例

      集群连接名称

      集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。

      flink_hive

      描述

      集群连接名称描述信息。

      -

      版本

      选择集群版本。

      MRS 3

      是否安全版本

      • 是,安全集群选择是。需要输入访问用户名和上传用户凭证;
      • 否,非安全集群选择否。

      访问用户名

      访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。

      “是否安全版本”选择“是”时存在此参数。

      flink_admin

      客户端配置文件

      集群客户端配置文件,格式为tar。

      -

      用户凭据

      FusionInsight Manager中用户的认证凭据,格式为tar。

      “是否安全版本”选择“是”时存在此参数。

      输入访问用户名后才可上传文件。

      flink_admin的用户凭据

  3. 新建Flink SQL流作业,如:flinktest1。

    1. 单击“作业管理”进入作业管理页面。
    2. 单击“新建作业”,在新建作业页面参考表2填写信息,单击“确定”,创建作业成功并进入作业开发界面。
      表2 新建作业信息

      参数名称

      参数描述

      取值样例

      类型

      作业类型,包括Flink SQL和Flink Jar。

      Flink SQL

      名称

      作业名称,只能包含英文字母、数字和下划线,且不能多于64个字符。

      flinktest1

      作业类型

      作业数据来源类型,包括流作业和批作业。

      流作业

      描述

      作业描述,不能超过100个字符。

      -

  4. 在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。

    CREATE TABLE test_kafka (
      user_id varchar,
      item_id varchar,
      cat_id varchar,
      zw_test timestamp
    ) WITH (
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'format' = 'json',
      'topic' = 'zw_tset_kafka',
      'connector' = 'kafka',
      'scan.startup.mode' = 'latest-offset',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    
    );
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-version' = '3.1.0',
      'default-database' = 'default',
      'cluster.name' = 'flink_hive'
    );
    use catalog myhive;
    set table.sql-dialect = hive;create table user_behavior_hive_tbl_no_partition (
        user_id STRING,
        item_id STRING,
        cat_id STRING,
        ts timestamp
      ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as textfile TBLPROPERTIES (
        'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00',
        'sink.partition-commit.trigger' = 'process-time',
        'sink.partition-commit.delay' = '0S',
        'sink.partition-commit.policy.kind' = 'metastore,success-file'
      );
    INSERT into
      user_behavior_hive_tbl_no_partition
    SELECT
      user_id,
      item_id,
      cat_id,
      zw_test,
      DATE_FORMAT(zw_test, 'yyyy-MM-dd'),
      DATE_FORMAT(zw_test, 'HH'),
      DATE_FORMAT(zw_test, 'mm')
    FROM
      default_catalog.default_database.test_kafka;
    • Kafka端口号:
      • 安全模式为“sasl.port”的值,默认为“21007”。
      • 非安全模式为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 'cluster.name' = 'flink_hive'的值为2新建的集群连接名称。
    • 相关参数可参考Flink官网:http://flink.apache.org/

  5. 作业SQL开发完成后,请勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
  6. 单击左上角“提交”提交作业。
  7. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
  8. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据。

    ./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为zw_tset_kafka:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic zw_tset_kafka --producer.config /opt/Bigdata/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    {"user_id": "3","item_id":"333333","cat_id":"cat333","zw_test":"2021-09-08 09:08:01"}
    {"user_id": "4","item_id":"444444","cat_id":"cat444","zw_test":"2021-09-08 09:08:01"} 

    输入完成后按回车发送消息。

    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。默认为24002。

  9. 执行以下命令查看Sink表中是否接收到数据,即Hive表是否正常写入数据。

    beeline

    select * from user_behavior_hive_tbl_no_partition;