更新时间:2024-11-08 GMT+08:00

创建Hive Catalog

简介

Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF。 元数据也可以是持久化的,例如Hive Metastore中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。详情参考Apache Flink Catalogs

功能描述

HiveCatalog有两个用途:作为原生Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。

Flink 的Hive 文档提供了有关设置 HiveCatalog以及访问现有 Hive 元数据的详细信息。详情参考:Apache Flink Hive Catalog

HiveCatalog可以用来处理两种类型的表:Hive兼容表和通用表。

  • Hive兼容表是以Hive兼容的方式存储的,他们的元数据和实际的数据都在分层存储中。因此,通过flink创建的与hive兼容的表,可以通过hive查询。
  • Hive通用表是特定于Flink的。当使用HiveCatalog创建通用表时,只是使用HMS来持久化元数据。虽然这些表对Hive来说是可见的,但Hive不太可能理解元数据。因此,在Hive中使用这样的表会导致未定义的行为。

建议切换到Hive方言来创建Hive兼容表。如果你想用默认的方言创建Hive兼容表,确保在你的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。如果使用Hive方言,就不需要connector属性。了解Hive方言

注意事项

  • 警告Hive Metastore以小写形式存储所有元数据对象名称。
  • 如果使用相同名称的目录已经存在,那么将会抛出一个异常。
  • Hudi表需要使用hudi catalog。并不适用于hive catalog。
  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。

语法格式

CREATE CATALOG myhive
WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/flink/conf'
);

USE CATALOG myhive;

参数说明

表1 参数说明

参数

必选

默认值

类型

描述

type

String

Catalog的类型。 创建HiveCatalog时,该参数必须设置为'hive'。

hive-conf-dir

String

指向包含 hive-site.xml目录的URI。

该值固定为'hive-conf-dir' = '/opt/flink/conf'

default-database

default

String

当一个catalog被设为当前catalog时,所使用的默认当前database。

支持的类型

HiveCatalog支持所有Flink类型的通用表。

对于兼容Hive的表,HiveCatalog需要将Flink数据类型映射到相应的Hive类型。

表2 数据类型映射表

Flink数据类型

Hive数据类型

CHAR(p)

CHAR(p)

VARCHAR(p)

VARCHAR(p)

STRING

STRING

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

LONG

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

DATE

DATE

TIMESTAMP(9)

TIMESTAMP

BYTES

BINARY

ARRAY<T>

LIST<T>

MAP

MAP

ROW

STRUCT

  • Hive的CHAR(p)最大长度为255。
  • Hive的VARCHAR(p)最大长度为65535。
  • Hive的MAP只支持原始类型的键,而Flink的MAP可以是任何数据类型。
  • Hive的UNION类型不支持。
  • Hive的TIMESTAMP总是精度为9,不支持其他精度。另一方面,Hive UDF可以处理精度<=9 的TIMESTAMP值。
  • Hive不支持Flink的TIMESTAMP_WITH_TIME_ZONE。 TIMESTAMP_WITH_LOCAL_TIME_ZONE,和 MULTISET。
  • Flink的INTERVAL类型还不能映射到Hive INTERVAL类型。

示例

  1. 在Flink OpenSource SQL作业中,创建名为myhive的catalog,并使用它用于管理元数据。
    CREATE CATALOG myhive WITH (
        'type' = 'hive'
        ,'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    create table dataGenSource(
      user_id string,
      amount int
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1', --每秒生成一条数据
      'fields.user_id.kind' = 'random', --为字段user_id指定random生成器
      'fields.user_id.length' = '3' --限制user_id长度为3
    );
    
    create table printSink(
      user_id string,
      amount int
    ) with (
      'connector' = 'print'
    );
    
    insert into printSink select * from dataGenSource;
  2. 查看default数据库中,是否含有dataGenSource、printSink 表。

    Hive Metastore 以小写形式存储所有元数据对象名称。

    图1 查看default数据库
  3. 使用名为myhive的catalog中的元数据,新建Flink OpenSource SQL作业。
    CREATE CATALOG myhive WITH (
        'type' = 'hive'
        ,'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    insert into printSink select * from dataGenSource;