创建Hive Catalog
简介
Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF。 元数据也可以是持久化的,例如Hive Metastore中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。详情参考Apache Flink Catalogs
功能描述
HiveCatalog有两个用途:作为原生Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。
Flink 的Hive 文档提供了有关设置 HiveCatalog以及访问现有 Hive 元数据的详细信息。详情参考:Apache Flink Hive Catalog
HiveCatalog可以用来处理两种类型的表:Hive兼容表和通用表。
- Hive兼容表是以Hive兼容的方式存储的,他们的元数据和实际的数据都在分层存储中。因此,通过flink创建的与hive兼容的表,可以通过hive查询。
- Hive通用表是特定于Flink的。当使用HiveCatalog创建通用表时,只是使用HMS来持久化元数据。虽然这些表对Hive来说是可见的,但Hive不太可能理解元数据。因此,在Hive中使用这样的表会导致未定义的行为。
建议切换到Hive方言来创建Hive兼容表。如果你想用默认的方言创建Hive兼容表,确保在你的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。如果使用Hive方言,就不需要connector属性。了解Hive方言。
注意事项
- 警告Hive Metastore以小写形式存储所有元数据对象名称。
- 如果使用相同名称的目录已经存在,那么将会抛出一个异常。
- Hudi表需要使用hudi catalog。并不适用于hive catalog。
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
语法格式
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive;
参数说明
参数 |
必选 |
默认值 |
类型 |
描述 |
---|---|---|---|---|
type |
是 |
无 |
String |
Catalog的类型。 创建HiveCatalog时,该参数必须设置为'hive'。 |
hive-conf-dir |
是 |
无 |
String |
指向包含 hive-site.xml目录的URI。 该值固定为'hive-conf-dir' = '/opt/flink/conf' |
default-database |
否 |
default |
String |
当一个catalog被设为当前catalog时,所使用的默认当前database。 |
支持的类型
HiveCatalog支持所有Flink类型的通用表。
对于兼容Hive的表,HiveCatalog需要将Flink数据类型映射到相应的Hive类型。
Flink数据类型 |
Hive数据类型 |
---|---|
CHAR(p) |
CHAR(p) |
VARCHAR(p) |
VARCHAR(p) |
STRING |
STRING |
BOOLEAN |
BOOLEAN |
TINYINT |
TINYINT |
SMALLINT |
SMALLINT |
INT |
INT |
BIGINT |
LONG |
FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
DECIMAL(p, s) |
DECIMAL(p, s) |
DATE |
DATE |
TIMESTAMP(9) |
TIMESTAMP |
BYTES |
BINARY |
ARRAY<T> |
LIST<T> |
MAP |
MAP |
ROW |
STRUCT |
- Hive的CHAR(p)最大长度为255。
- Hive的VARCHAR(p)最大长度为65535。
- Hive的MAP只支持原始类型的键,而Flink的MAP可以是任何数据类型。
- Hive的UNION类型不支持。
- Hive的TIMESTAMP总是精度为9,不支持其他精度。另一方面,Hive UDF可以处理精度<=9 的TIMESTAMP值。
- Hive不支持Flink的TIMESTAMP_WITH_TIME_ZONE。 TIMESTAMP_WITH_LOCAL_TIME_ZONE,和 MULTISET。
- Flink的INTERVAL类型还不能映射到Hive INTERVAL类型。
示例
- 在Flink OpenSource SQL作业中,创建名为myhive的catalog,并使用它用于管理元数据。
CREATE CATALOG myhive WITH ( 'type' = 'hive' ,'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table dataGenSource( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', --每秒生成一条数据 'fields.user_id.kind' = 'random', --为字段user_id指定random生成器 'fields.user_id.length' = '3' --限制user_id长度为3 ); create table printSink( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink select * from dataGenSource;
- 查看default数据库中,是否含有dataGenSource、printSink 表。
Hive Metastore 以小写形式存储所有元数据对象名称。
图1 查看default数据库
- 使用名为myhive的catalog中的元数据,新建Flink OpenSource SQL作业。
CREATE CATALOG myhive WITH ( 'type' = 'hive' ,'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; insert into printSink select * from dataGenSource;