更新时间:2026-06-11 GMT+08:00
分享

FlinkServer对接Hive

操作场景

目前FlinkServer对接Hive使用对接MetaStore的方式,所以需要Hive开启MetaStore功能。Hive可以作为Source、Sink和维表。

本示例以安全模式Kafka为例。

前提条件

  • 集群已安装HDFS、Yarn、Kafka、Flink和Hive(且服务名称必须为Hive)等服务。
  • 包含Hive服务的客户端已安装,安装路径如:/opt/client
  • Flink支持1.12.2及以后版本,Hive支持3.1.0及以后版本。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
  • 参考Catalog管理创建一个Hive Catalog,如:myhive。
  • 参考创建FlinkServer集群连接中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。

FlinkServer不支持跨集群访问Hive,不支持集群中存在多个Hive服务。

约束与限制

本章节适用于MRS 3.6.0-LTS.1及之后的版本。

操作步骤

  1. 使用flink_admin访问Flink WebUI,请参考访问FlinkServer WebUI界面
  2. 参考Catalog管理创建一个Hive Catalog,如:hive_catalog。
  3. 新建一个Hive表,例如:hive_catalog.`default`.hive_table,具体操作可参考数据管理,完整示例如下:

    create table if not exists hive_catalog.`default`.hive_table (
      user_id STRING,
      item_id STRING,
      cat_id STRING,
      ts timestamp(9),
      dy STRING,
      ho STRING,
      mi STRING
    ) PARTITIONED BY (dy, ho, mi)
    with
      (
        'connector' = 'hive',
        'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00',
        'sink.partition-commit.trigger' = 'process-time',
        'sink.partition-commit.delay' = '0S',
        'sink.partition-commit.policy.kind' = 'metastore,success-file'
      );

  4. 新建Flink SQL流作业,如:flinktest1。

    1. 单击“作业管理”进入作业管理页面。
    2. 单击“新建作业”,在新建作业页面参考表1填写信息,单击“确定”,创建作业成功并进入作业开发界面。
      表1 新建作业信息

      参数名称

      参数描述

      取值样例

      类型

      作业类型,包括Flink SQL和Flink Jar。

      Flink SQL

      名称

      作业名称,只能包含英文字母、数字和下划线,且不能多于64个字符。

      flinktest1

      作业类型

      作业数据来源类型,包括流作业和批作业。

      流作业

      描述

      作业描述,不能超过100个字符。

      -

    3. 在作业开发界面进行作业开发,输入SQL语句,可以单击上方“语义校验”对输入内容校验。
      • Hive作为Sink表
        1. 流式写入Hive表。
          CREATE TABLE test_datagen (
            user_id varchar,
            item_id varchar,
            cat_id varchar,
            zw_test timestamp
          ) WITH (
            'connector' = 'datagen',
            'rows-per-second' = '1'
          );
          INSERT into
            hive_catalog.`default`.hive_table
          SELECT
            user_id,
            item_id,
            cat_id,
            zw_test,
            DATE_FORMAT(zw_test, 'yyyy-MM-dd'),
            DATE_FORMAT(zw_test, 'HH'),
            DATE_FORMAT(zw_test, 'mm')
          FROM
            test_datagen;
        2. 单击左上角“提交”提交作业。
        3. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
        4. 执行以下命令查看Sink表中是否接收到数据,即Hive表是否正常写入数据。
          beeline
          select * from hive_table;
      • Hive作为Source表
        1. 流式读取Hive表。
          CREATE TABLE print_sink(
          user_id varchar,
          item_id varchar,
          cat_id varchar
          ) WITH (
          'connector' = 'print'
          );
          insert into
          print_sink
          select
          user_id,
          item_id,
          cat_id
          from
          hive_catalog.`default`.hive_table;
        2. 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
        3. 单击左上角“提交”提交作业。
        4. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
        5. 进入作业详情界面,单击“Task Managers > Stdout”,查看是否正常打印hive_table表中的数据。
      • Hive作为维表
        1. Hive作为维表。
          CREATE TABLE datagen_source (
            user_id string, 
            address string, 
            proctime as PROCTIME()
          ) WITH (
            'connector' = 'datagen',
            'rows-per-second' = '1'
          );
          CREATE TABLE print_sink(
            user_id  string,
            address string,
            cat_id string
          ) WITH (
            'connector' = 'print'
          );
          INSERT INTO
            default_catalog.default_database.print_sink
          select
            t1.user_id ,
            t1.address,
            t2.cat_id
          from
            default_catalog.default_database.datagen_source as t1
            join hive_catalog.`default`.hive_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id  = t2.user_id ;
        2. 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
        3. 单击左上角“提交”提交作业。
        4. 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
        5. 进入作业详情界面,单击“Task Managers > Stdout”,即可查看打印的结果数据。

相关文档