配置Iceberg Catalog
创建Catalog
Catalog用于加载、创建和管理Iceberg表,具体通过配置spark.sql.catalog的相关属性来实现。
- 创建名为hive_prod的Iceberg Catalog,用于从Hive的元数据存储中加载表:
SET spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.hive_prod.type = hive;
SET spark.sql.catalog.hive_prod.uri = thrift://MetaStore IP地址:port;
其中,“spark.sql.catalog.hive_prod.uri”参数值可登录HiveServer实例所在节点,执行以下命令获取:
cat ${BIGDATA_HOME}/FusionInsight_HD_*/1_*_HiveServer/etc/hive-site.xml | grep -A 1 "hive.metastore.uris"
如果“hive.metastore.uris”有多个值,取任意一个即可。
- Iceberg同时支持基于HDFS的Catalog,只需将“type”设置为“hadoop”:
SET spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.hadoop_prod.type = hadoop;
SET spark.sql.catalog.hadoop_prod.warehouse = hdfs://NameNode IP地址:8020/warehouse/path;
其中,NameNode IP地址可登录Manager界面,选择“集群 > 服务 > HDFS > 实例”,在实例页面查看任一NameNode的IP地址获取。
配置Catalog
通过属性“spark.sql.catalog.catalog-name”添加具体的实现类来配置Catalog。支持以下两种类:
- org.apache.iceberg.spark.SparkCatalog
org.apache.iceberg.spark.SparkCatalog由Iceberg全权管理,主要支持Iceberg表,只使用Hive Metastore或 Hadoop的目录结构来存储这些元数据。
SET spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkCatalog;
- org.apache.iceberg.spark.SparkSessionCatalog
org.apache.iceberg.spark.SparkSessionCatalog由Spark的内置Catalog管理,支持多种表格式。识别到Iceberg表后委托给底层的Iceberg API;对于非Iceberg表(如Parquet、Hive),仍由Spark原生的元数据管理机制或Hive Metastore管理。
SET spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog;
常用的Catalog属性请参见下表:
|
参数 |
说明 |
|---|---|
|
spark.sql.catalog.catalog-name.type |
底层使用的Iceberg Catalog实现方式,对应Hive Catalog、Hadoop Catalog等。如果使用自定义目录,该参数值可为空。 取值范围为:
|
|
spark.sql.catalog.catalog-name.catalog-impl |
自定义的Iceberg Catalog实现类。如果“spark.sql.catalog.catalog-name.type”为空,必须设置该参数值。 |
|
spark.sql.catalog.catalog-name.io-impl |
自定义的文件IO实现类。 |
|
spark.sql.catalog.catalog-name.metrics-reporter-impl |
自定义的指标报告器实现类。 |
|
spark.sql.catalog.catalog-name.default-namespace |
目录的默认命名空间,默认值为“default”。 |
|
spark.sql.catalog.catalog-name.uri |
对于Hive类型的目录,值为Hive元数据存储的URL;对于REST类型的目录,值为REST服务的URL。参数值格式为“thrift://host:port”。 |
|
spark.sql.catalog.catalog-name.warehouse |
仓库目录的基础路径,为HDFS或OBS路径。 |
|
spark.sql.catalog.catalog-name.cache-enabled |
是否启用目录缓存,默认值为“true”。 |
|
spark.sql.catalog.catalog-name.cache.expiration-interval-ms |
缓存的目录条目过期时间,仅当“cache-enabled”为“true”时有效。默认值为30000(30秒),其中:
|
|
spark.sql.catalog.catalog-name.table-default.propertyKey |
表属性propertyKey的默认值,使用该目录创建表时,如果没特别指定,就使用默认值。 |
|
spark.sql.catalog.catalog-name.table-override.propertyKey |
表属性propertyKey的强制值,用户不能修改。 |
|
spark.sql.catalog.catalog-name.use-nullable-query-schema |
用CTAS和RTAS创建表时,是否保留字段的可空性,适用于Spark 3.5及之后版本。
|
使用Catalog
Catalog名用于在SQL查询中标识目标表。在创建Catalog的示例中,hive_prod与hadoop_prod可作为数据库名和表名的前缀,通过前缀可明确从对应的目录中加载目标数据库和表。
//执行CREATE TABLE建表操作后可执行以下命令: SELECT * FROM prod.db.sample;
Spark3会自动记录当前使用的Catalog与命名空间,若目标表处于当前Catalog及命名空间下,可省略Catalog名与命名空间前缀。
切换数据库:
USE prod.db;
查询数据:
SELECT * FROM sample;
加载自定义Catalog
Spark支持通过指定catalog-impl属性来加载自定义的Iceberg Catalog实现。配置示例如下:
SET spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog;
SET `spark.sql.catalog.custom_prod.catalog-impl` = com.my.custom.CatalogImpl;
SET `spark.sql.catalog.custom_prod.my-additional-catalog-config` = my-value;