更新时间:2024-04-19 GMT+08:00

Hive源表

简介

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink与Hive的集成包含两个层面,一是利用了Hive的MetaStore作为持久化的Catalog,二是利用Flink来读写Hive的表。Overview | Apache Flink

从Flink 1.11.0开始,在使用 Hive方言时,Flink允许用户用Hive语法来编写SQL语句。通过提供与Hive语法的兼容性,改善与Hive的互操作性,并减少用户需要在Flink和Hive之间切换来执行不同语句的情况。详情可参考:Apache Flink Hive 方言

使用HiveCatalog,Apache Flink可以用于统一处理Apache Hive表的BATCH和STREAM。Flink可以作为Hive批处理引擎的更高效的替代方案,或者用于连续读写Hive表,以支持实时数据仓库应用程序。Apache Flink Hive Read & Write

功能描述

本节介绍利用Flink来读写Hive的表。Hive源表的定义,以及创建源表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write

Flink支持在BATCH 和 STREAMING模式下从Hive读取数据。当作为BATCH应用程序运行时,Flink将在执行查询的时间点对表的状态执行查询。STREAMING读取将持续监控表,并在新数据可用时以增量方式获取新数据。默认情况下,Flink会读取有界的表。

STREAMING读取支持同时使用分区表和非分区表。对于分区表,Flink将监控新分区的生成,并在可用时增量读取它们。对于未分区的表,Flink 会监控文件夹中新文件的生成情况,并增量读取新文件。

前提条件

该场景作业需要建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 数据类型的使用,请参考Format章节。
  • Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。
    • 使用Hive语法创建OBS表
      • defalut方言: with 属性中需要设置hive.is-external为true。
      • 使用hive 方言:建表语句需要使用EXTERNAL关键字。
    • 使用hive语法的DLI Lakehouse表
      • 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。
  • 开启checkpoint功能。
  • 建议切换到Hive方言来创建Hive兼容表。如果你想用默认的方言创建Hive兼容表,确保在你的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。注意,如果你使用Hive方言,就不需要connector属性了。
  • 监视策略是扫描当前位置路径中的所有目录/文件。许多分区可能会导致性能下降。
  • 对未分区表进行流式读取时,要求将每个文件以原子方式写入目标目录。
  • 分区表的流式读取要求在 hive 元存储的视图中以原子方式添加每个分区。否则,将使用添加到现有分区的新数据。
  • 流式读取不支持 Flink DDL 中的水印语法。这些表不能用于窗口运算符。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CREATE EXTERNAL TABLE [IF NOT EXISTS] table_name
  [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  [
    [ROW FORMAT row_format]
    [STORED AS file_format]
  ]
  [LOCATION obs_path]
  [TBLPROPERTIES (property_name=property_value, ...)]

row_format:
  : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char]
      [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
      [NULL DEFINED AS char]
  | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)]

file_format:
  : SEQUENCEFILE
  | TEXTFILE
  | RCFILE
  | ORC
  | PARQUET
  | AVRO
  | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname

column_constraint:
  : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]

table_constraint:
  : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]

参数说明

请参考使用Hive语法创建OBS表,和Hive 文档了解每个DDL语句的语义。

表1 TBLPROPERTIES 参数说明

参数

是否必选

默认参数

数据类型

说明

streaming-source.enable

false

Boolean

是否启用流源。 注意: 请确保每个分区/文件都应该以原子方式写入,否则读取器可能会得到不完整的数据。

streaming-source.partition.include

all

String

设置分区读取的选项,支持的选项是 'all' 和 'latest'。默认情况下,该选项为 'all' 。

'all' 表示读取所有分区;

'latest'仅在流式处理 Hive 源表用作temporal table时才有效。'latest' 表示按'streaming-source.partition.order'的顺序读取最新的分区。

Flink 支持对最新的 hive 分区进行临时连接,通过启用 'streaming-source.enable',并将 'streaming-source.partition.include' 设置为 'latest'。同时,用户可以通过配置以下分区相关选项来分配分区比较顺序和数据更新间隔。

streaming-source.monitor-interval

None

Duration

连续监视分区/文件的时间间隔。注意:Hive 流式处理读取的默认间隔为'1 min',Hive 流式处理temporal join的默认间隔为 '60 min',这是因为在当前 Hive 流式处理临时联接实现中,每个 TM 都会访问 Hive metaStore,这可能会对 metaStore 产生压力,这将在未来得到改善。

streaming-source.partition-order

partition-name

String

流源的分区顺序,支持 create-time、partition-time 和 partition-name。

create-time 比较分区/文件创建时间,这不是 Hive metaStore 中的分区创建时间,而是文件系统中的文件夹/文件修改时间,如果分区文件夹以某种方式更新,例如将新文件添加到文件夹中,可能会影响数据的使用方式。

partition-time 比较从分区名称中提取的时间。

partition-name 比较分区名称的字母顺序。

对于非分区表,此值应始终为“create-time”。

默认情况下,该值为 partition-name。该选项与已弃用的选项“streaming-source.consume-order”相等。

streaming-source.consume-start-offset

None

String

流式处理消费的起始偏移量。如何解析和比较偏移量取决于您的订单。对于 create-time 和 partition-time,应为时间戳字符串 (yyyy-[m]m-[d]d [hh:mm:ss])。

对于partition-time,将使用分区时间提取器从分区中提取时间。对于 partition-name,是分区名称字符串(例如 pt_year=2020/pt_mon=10/pt_day=01)。

is_lakehouse

Boolean

如果使用hive语法的DLI Lakehouse表,则需要设置is_lakehouse为true。

  • Source Parallelism Inference

    默认情况下,Flink 会根据文件数量和每个文件中的块数来推断其 Hive 读取器的最佳并行度。

    Flink 支持灵活配置并行推理策略。您可以在 TableConfig 中配置以下参数(请注意,这些参数会影响作业的所有源):

    Key

    Default

    Type

    Description

    table.exec.hive.infer-source-parallelism

    true

    Boolean

    如果为 true,则根据拆分数推断源并行度。如果为 false,则源的并行度由 config 设置。

    table.exec.hive.infer-source-parallelism.max

    1000

    Integer

    设置源运算符的最大推断并行度。

  • Load Partition Splits

    多线程用于拆分 hive 的分区。您可以使用 table.exec.hive.load-partition-splits.thread-num 来配置线程号。默认值为 3,配置的值应大于 0。

    Key

    Default

    Type

    Description

    table.exec.hive.load-partition-splits.thread-num

    3

    Integer

    配置的值应大于0。

    SQL 提示可用于将配置应用于 Hive 表,而无需更改其在 Hive 元存储中的定义。Hints | Apache Flink

  • Vectorized Optimization upon Read

    当满足以下条件时,Flink 会自动对 Hive 表进行矢量化读取:

    • 格式:ORC 或 Parquet。
    • 没有复杂数据类型的列,如配置单元类型:List、Map、Struct、Union。
      默认情况下,此功能处于启用状态。可以使用以下配置禁用它。
      table.exec.hive.fallback-mapred-reader=true
  • Reading Hive Views

    Flink 能够从 Hive 定义的视图中读取数据,但存在一些限制:

    • 必须先将 Hive 目录设置为当前目录,然后才能查询视图。这可以通过表 API 中的 tableEnv.useCatalog(...) 或 USE CATALOG ...在 SQL 客户端中。
    • Hive 和 Flink SQL 具有不同的语法,例如不同的保留关键字和文字。确保视图的查询与 Flink 语法兼容。

示例

  1. 使用Spark SQL创建Hive语法OBS表,并插入10条数据。模拟数据源。
    CREATE TABLE IF NOT EXISTS demo.student(
      name STRING, 
     score DOUBLE) 
    PARTITIONED BY (classNo INT) 
    STORED AS PARQUET 
    LOCATION 'obs://demo/spark.db/student';
    
    INSERT INTO demo.student PARTITION(classNo=1) VALUES ('Alice', 90.0), ('Bob', 80.0), ('Charlie', 70.0), ('David', 60.0), ('Eve', 50.0), ('Frank', 40.0), ('Grace', 30.0), ('Hank', 20.0), ('Ivy', 10.0), ('Jack', 0.0);
  2. 使用Flink SQL展示使用批的方式,从Hive语法OBS表demo.student中读取数据,并打印。需要开启checkpoint。
    CREATE CATALOG myhive WITH (
        'type' = 'hive',
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    create table if not exists print (
        name STRING, 
        score DOUBLE, 
        classNo INT)
    with ('connector' = 'print');
    
    insert into print
    select * from student;
    结果(taskmanager的out日志):
    +I[Alice, 90.0, 1]
    +I[Bob, 80.0, 1]
    +I[Charlie, 70.0, 1]
    +I[David, 60.0, 1]
    +I[Eve, 50.0, 1]
    +I[Frank, 40.0, 1]
    +I[Grace, 30.0, 1]
    +I[Hank, 20.0, 1]
    +I[Ivy, 10.0, 1]
    +I[Jack, 0.0, 1]
  3. 使用Flink SQL展示使用流的方式,从Hive语法OBS表demo.student中读取数据,并打印。
    CREATE CATALOG myhive WITH (
        'type' = 'hive' ,
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    create table if not exists print (
        name STRING, 
        score DOUBLE, 
        classNo INT)
    with ('connector' = 'print');
    
    insert into print
    select * from student /*+ OPTIONS('streaming-source.enable' = 'true', 'streaming-source.monitor-interval' = '3 m') */;

上述使用了sql hints功能, SQL 提示可用于将配置应用于 Hive 表,而无需更改其在 Hive 元存储中的定义。详情请参考:SQL Hints