更新时间:2026-06-11 GMT+08:00
分享

配置Spark对接Paimon

操作场景

本章节指导用户使用Spark对接Paimon提交作业。

约束与限制

  • 如果需要使用HetuEngine查询Hive客户端创建的Paimon表,表必须创建在单独的database下(即database的Location必须为“warehouse路径 + database名称 + .db后缀”,即“warehouse路径/database名称.db”),不能使用default database,否则HetuEngine查询会报错提示表不存在。

前提条件

  • 集群中已安装HDFS、Zookeeper、Yarn、Spark、Hive组件。
  • 已安装集群的Spark客户端。

使用Spark对接Paimon提交作业

  1. 使用客户端安装用户登录Spark客户端节点,进入客户端安装目录。

    cd 客户端安装目录

    加载环境变量:

    source bigdata_env

    加载组件环境变量:

    source Spark/component_env

    安全模式执行以下命令,普通模式无需执行:

    kinit 组件业务用户

    输入密码完成认证(首次登录需要修改密码)

  2. 获取paimon依赖包的完整路径。

    cd {客户端安装路径}/Spark/spark/jars/paimon/
    ll

    不同版本paimon-spark和paimon-hive-connector-common的版本号不同:

    -rwxr-x---. 1 root root 45M Mar 14 08:56 paimon-hive-connector-common-{具体版本号}.jar

    -rwxr-x---. 1 root root 41M Mar 14 08:56 paimon-spark-3.5-{具体版本号}.jar

  3. 执行如下命令,使用spark-sql对接paimon:

    spark-sql \
    --master yarn \
    --jars {客户端安装路径}/Spark/spark/jars/paimon/{paimon-spark完整包名},{客户端安装路径}/Spark/spark/jars/paimon/{paimon-hive-connector-common完整包名} \
    --driver-class-path {客户端安装路径}/Spark/spark/jars/paimon/{paimon-spark完整包名}:{客户端安装路径}/Spark/spark/jars/paimon/{paimon-hive-connector-common完整包名} \
    --conf spark.sql.catalog.{paimon_catalog_name}=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.{paimon_catalog_name}.warehouse=${HIVE_METASTORE_WAREHOUSE_DIR} \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \
    --conf spark.sql.catalog.{paimon_catalog_name}.metastore=hive \
    --conf spark.sql.catalog.{paimon_catalog_name}.uri=${HIVE_METASTORE_URI_DEFAULT}
    1. 所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。
      USE {paimon_catalog_name};
    2. 如果涉及Spark写Paimon表,Hive读取,涉及Timestamp类型读写时,只支持TIMESTAMP_NTZ类型的Timestamp跨引擎读写,Spark创建表前需将spark.sql.timestampType参数设置为TIMESTAMP_NTZ。
    3. 如果涉及Spark读取其他引擎写的Paimon表的Timestamp类型字段,需要将spark.sql.session.timeZone设置为与写Paimon表的引擎的时区配置保持一致,否则可能出现结果不一致的情况,如果写入引擎使用的TIMESTAMP_NTZ类型的Timestamp,需将spark.sql.timestampType参数设置为TIMESTAMP_NTZ。
    4. 注意--jars和--driver-class-path后路径的分隔符不同,分别是“,”和“:”。

    使用说明:

    1. “${HIVE_METASTORE_URI_DEFAULT}”参数值获取方式:

      登录Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > MetaStore(角色)”,搜索参数HIVE_METASTORE_URI_DEFAULT,即可获取参数值。

    2. "${HIVE_METASTORE_WAREHOUSE_DIR}"参数获取方式:

      登录Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > MetaStore(角色)”,搜索参数hive.metastore.warehouse.dir,即可获取参数值。

    3. spark-submit和spark-shell作业提交方式与spark-sql一致。
    4. spark-beeline场景,需要将客户端下的“{客户端安装路径}/Spark/spark/jars/paimon/paimon-spark-3.5*.jar”和“{客户端安装路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar”上传到HDFS或OBS统一归档路径。

      登录Manager界面,选择“集群 > 服务 > Spark > 配置 > 全部配置 > JDBCServer(角色) ”,在“custom”中添加表1配置,其次按表2修改配置项,然后重启JDBCserver。

      表1 添加到custom中的配置项

      名称

      spark.jars

      {hdfs或obs统一归档路径}/paimon-spark-3.5*.jar,{hdfs或obs统一归档路径}/paimon-hive-connector-common*.jar

      spark.sql.catalog.{paimon_catalog_name}

      org.apache.paimon.spark.SparkCatalog

      spark.sql.catalog.{paimon_catalog_name}.warehouse

      ${HIVE_METASTORE_WAREHOUSE_DIR}

      spark.sql.catalog.{paimon_catalog_name}.metastore

      hive

      spark.sql.catalog.{paimon_catalog_name}.uri

      ${HIVE_METASTORE_URI_DEFAULT}

      表2 修改的配置项

      名称

      默认值值

      修改后的值

      spark.sql.extensions

      org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension

      org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

      • 如果Paimon表功能与Hudi功能冲突时,请删除spark.sql.extensions中的org.apache.spark.sql.hudi.HoodieSparkSessionExtension值。
      • spark.shuffle.manager需要保持默认值sort。
    5. 管理面提交作业场景,需要将客户端下的“{客户端安装路径}/Spark/spark/jars/paimon/paimon-spark-3.5*.jar”和“{客户端安装路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar”上传到HDFS或OBS统一归档路径,在提交作业时指定以下配置:
      • --jas {hdfs或obs统一归档路径}/paimon-spark-3.5*.jar,{hdfs或obs统一归档路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar
      • --conf spark.sql.catalog.{paimon_catalog_name}=org.apache.paimon.spark.SparkCatalog
      • --conf spark.sql.catalog.{paimon_catalog_name}.warehouse=${HIVE_METASTORE_WAREHOUSE_DIR}
      • --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      • --conf spark.sql.catalog.{paimon_catalog_name}.metastore=hive
      • --conf spark.sql.catalog.{paimon_catalog_name}.uri=${HIVE_METASTORE_URI_DEFAULT}

      所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。

      USE {paimon_catalog_name};

Spark和Paimon数据类型转换

Spark Data Type

Paimon Data Type

Atomic Type

StructType

RowType

false

MapType

MapType

false

ArrayType

ArrayType

false

BooleanType

BooleanType

true

ByteType

TinyIntType

true

ShortType

SmallIntType

true

IntegerType

IntType

true

LongType

BigIntType

true

FloatType

FloatType

true

DoubleType

DoubleType

true

StringType

VarCharType(Integer.MAX_VALUE)

true

VarCharType(length)

VarCharType(length)

true

CharType(length)

CharType(length)

true

DateType

DateType

true

TimestampType

LocalZonedTimestamp

true

TimestampNTZType

TimestampType

true

DecimalType(precision, scale)

DecimalType(precision, scale)

true

BinaryType

VarBinaryType, BinaryType

true

SQL DDL

按照spark on paimon作业提交命令启动后,会默认创建一个catalog,可以使用以下SQL 切换到catalog {paimon_catalog_name}的指定数据库{database_name}:

USE {paimon_catalog_name}.{database_name};

所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。

Create Table

  • 不支持CREATE TABLE LIKE语法。
  • 如果需要使用HetuEngine查询Hive客户端创建的Paimon表,表必须创建在单独的database下(即database的Location必须为“warehouse路径 + database名称 + .db后缀”),不能使用default database,否则HetuEngine查询会报错提示表不存在。

创建一个有五列的表 my_table ,其中 dt、hh和user_id是主键的表:

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
建分区表:
CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

Create External Table
CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
) LOCATION '/path/to/table';

Create Table As Select

使用CREATE TABLE AS SELECT时可以指定主键或者分区,语法请参考如下SQL:

CREATE TABLE my_table (
     user_id BIGINT,
     item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;

/* partitioned table*/
CREATE TABLE my_table_partition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;

/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;


/* primary key */
CREATE TABLE my_table_pk (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;

/* primary key + partition */
CREATE TABLE my_table_all (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;

Create Or Replace View

CREATE VIEW构建一个没有物理数据的虚拟表:

-- create a view.
CREATE VIEW v1 AS SELECT * FROM t1;

-- create a view, if a view of same name already exists, it will be replaced.
CREATE OR REPLACE VIEW v1 AS SELECT * FROM t1;

Drop View

DROP VIEW从目录中删除与指定视图相关的元数据:

DROP VIEW v1;

Create Or Replace Tag

使用以下选项创建或替换标签语法:

  • 创建带有或不带有快照ID和时间保留的标签。
  • 如果使用IF NOT EXISTS语法,创建已存在的标签不会失败。
  • 使用REPLACE TAGCREATE OR REPLACE TAG语法更新标签:
    -- create a tag based on the latest snapshot and no retention.
    ALTER TABLE T CREATE TAG `TAG-1`;
    
    -- create a tag based on the latest snapshot and no retention if it doesn't exist.
    ALTER TABLE T CREATE TAG IF NOT EXISTS `TAG-1`;
    
    -- create a tag based on the latest snapshot and retain it for 7 day.
    ALTER TABLE T CREATE TAG `TAG-2` RETAIN 7 DAYS;
    
    -- create a tag based on snapshot-1 and no retention.
    ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1;
    
    -- create a tag based on snapshot-2 and retain it for 12 hour.
    ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS;
    
    -- replace an existing tag with new snapshot id and new retention
    ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS;
    
    -- create or replace a tag, create tag if it not exist, replace tag if it exists.
    ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;

    如果设置了 tag.automatic-creation,则每个快照只能创建一个自动标签。

Delete Tag

删除表的一个或多个标签:

-- delete a tag.
ALTER TABLE T DELETE TAG `TAG-1`;

-- delete a tag if it exists.
ALTER TABLE T DELETE TAG IF EXISTS `TAG-1`

-- delete multiple tags, delimiter is ','.
ALTER TABLE T DELETE TAG `TAG-1,TAG-2`;

Rename Tag

使用新标签名称重命名现有标签:

ALTER TABLE T RENAME TAG `TAG-1` TO `TAG-2`;

Show Tags

列出表的所有标签:

SHOW TAGS T;

SQL写入

INSERT语句用于向表中插入新行或覆盖表中的现有数据。插入的行可以通过值表达式或查询结果指定。

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };

Insert Into

使用INSERT INTO将记录和更改应用到表:

INSERT INTO my_table SELECT ...

Insert Overwrite

使用INSERT OVERWRITE覆盖分区:

INSERT OVERWRITE my_table SELECT ...

Dynamic Overwrite Partition

将Spark配置spark.sql.sources.partitionOverwriteMode 设置为dynamic:

CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');
-- or 
INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  3| p1|
+---+---+
*/

-- Static overwrite with specified partitions (Only overwrite pt='p1')
INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3);

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  2| p2|
|  3| p1|
+---+---+
*/

-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  2| p2|
|  3| p1|
+---+---+
*/

Truncate Table

TRUNCATE TABLE语句从表或分区中删除所有行:

TRUNCATE TABLE my_table;

Paimon添加了防呆设计,当执行Truncate操作时,不会真正删除数据,只是给数据做了标记,等数据过期后才会删除。

snapshot.time-retainedsnapshot.num-retained.min同时满足时,Paimon 写入器在提交新更改时会自动执行过期操作。

Update Table

更新与谓词匹配的行的列值。如果未提供谓词,则更新所有行的列值。

当目标表为主键表时,不支持更新主键列。

Spark 支持更新PrimitiveType和StructType:

-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

CREATE TABLE t (
  id INT, 
  s STRUCT<c1: INT, c2: STRING>, 
  name STRING)
TBLPROPERTIES (
  'primary-key' = 'id', 
  'merge-engine' = 'deduplicate'
);

-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

Delete From Table

删除与谓词匹配的行。如果未提供谓词,则删除所有行:

DELETE FROM my_table WHERE currency = 'UNKNOWN';

Merge Into Table

将基于源表的一组更新、插入和删除操作合并到目标表中。在更新子句中,当目标表是主键表时,不支持更新主键列。

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

Write Merge Schema:不支持该语法。

SQL查询

Paimon的批量读取会返回表快照中的所有数据。默认情况下,批量读取会返回最新的快照。

-- read all columns 
SELECT * FROM t;

Paimon 还支持读取一些隐藏的元数据列,目前支持以下列:

  • __paimon_file_path :记录的文件路径。
  • __paimon_partition :记录的分区。
  • __paimon_bucket :记录的存储桶。
  • __paimon_row_index :记录的行索引。
  • _ROW_ID :记录的唯一行 ID(仅当 row-tracking.enabled 设置为 true 时有效)。
  • _SEQUENCE_NUMBER :记录的序列号(仅当 row-tracking.enabled 设置为 true 时有效)。
SELECT *, __paimon_file_path, __paimon_partition, __paimon_bucket, __paimon_row_index FROM t;

Batch Time Travel

Paimon批量读取具有时间旅行功能,可以指定快照或标签并读取相应的数据。可以在查询中使用 VERSION AS OF 和 TIMESTAMP AS OF 来进行时间旅行:

查询表的快照信息:

desc `my_table$snapshots`;
/*
snapshot_id             bigint
schema_id               bigint
commit_user             string
commit_identifier       bigint
commit_kind             string
commit_time             timestamp_ntz
base_manifest_list      string
delta_manifest_list     string
changelog_manifest_list string
total_record_count      bigint
delta_record_count      bigint
changelog_record_count  bigint
watermark               bigint
*/

SELECT * FROM `my_table$snapshots`;

查询表快照信息时,反引号不能少,如:`table_name$snapshots`。

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;

-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

如果标签名称为数字且等于快照 ID,则 VERSION AS OF 语法会优先考虑标签。例如,如果您有一个名为“1”的标签,且该标签基于快照 2,则语句 SELECT * FROM t VERSION AS OF '1' 实际上会查询快照 2,而不是快照 1。

Batch Incremental

读取开始快照(独占)和结束快照之间的增量变化。

默认情况下,将扫描生成更改日志文件的表的更改日志文件。否则,将扫描新更改的文件。您也可以强制指定 'incremental-between-scan-mode' 。

Paimon支持在Spark SQL中使用函数paimon_incremental_query、paimon_incremental_between_timestamp、paimon_incremental_to_auto_tag进行Spark Table Valued Function实现的增量查询。

-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);

-- read the incremental data between ts 1692169900000 and ts 1692169900000.
SELECT * FROM paimon_incremental_between_timestamp('tableName', '1692169000000', '1692169900000');
SELECT * FROM paimon_incremental_between_timestamp('tableName', '2025-03-12 00:00:00', '2025-03-12 00:08:00');

-- read the incremental data to tag '2024-12-04'.
-- Paimon will find an earlier tag and return changes between them.
-- If the tag doesn't exist or the earlier tag doesn't exist, return empty.
SELECT * FROM paimon_incremental_to_auto_tag('tableName', '2024-12-04');

批量 SQL 不允许返回 DELETE 记录,因此 -D 的记录会被丢弃。如果要查看 DELETE 记录,可以查询 audit_log 表

Query Optimization

强烈建议在查询时指定分区和主键过滤器,这将加快查询的数据跳过速度。

可以加速数据跳过的过滤函数有:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

例如定义一个表:

CREATE TABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,
    .....,
) TBLPROPERTIES (
    'primary-key' = 'catalog_id,order_id'
);

通过为主键最左边的前缀指定范围过滤器,查询获得了良好的加速:

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
  WHERE catalog_id=1025
  AND order_id>2035 AND order_id<6000;

但是下面的过滤器不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

SQL修改表属性

Changing/Adding Table Properties

将write-buffer-size表属性设置为256 MB:

ALTER TABLE my_table SET TBLPROPERTIES (
    'write-buffer-size' = '256 MB'
);

Removing Table Properties

删除write-buffer-size表属性:

ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');

Changing/Adding Table Comment

将表my_table的注释更改为table comment:

ALTER TABLE my_table SET TBLPROPERTIES (
    'comment' = 'table comment'
    );

Removing Table Comment

删除表注释:

ALTER TABLE my_table UNSET TBLPROPERTIES ('comment');

Rename Table Name

将表名重命名为新名称:

ALTER TABLE my_table RENAME TO my_table_new;

Adding New Columns

将表my_table添加两列c1和c2:

ALTER TABLE my_table ADD COLUMNS (
    c1 INT,
    c2 STRING
);

向结构类型添加嵌套列f3:

-- column v previously has type STRUCT<f1: STRING, f2: INT>
ALTER TABLE my_table ADD COLUMN v.f3 STRING;

向结构体类型添加嵌套列f3 ,该结构体类型是array类型的元素类型:

-- column v previously has type ARRAY<STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table ADD COLUMN v.element.f3 STRING;

向结构类型添加嵌套列f3 ,该结构类型是map类型的值类型:

-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table ADD COLUMN v.value.f3 STRING;

Renaming Column Name

将表my_table中的列c0重命名为c1:

ALTER TABLE my_table RENAME COLUMN c0 TO c1;

将结构类型中的嵌套列f1重命名为f100:

-- column v previously has type STRUCT<f1: STRING, f2: INT>
ALTER TABLE my_table RENAME COLUMN v.f1 to f100;

将结构体类型中的嵌套列f1重命名为f100 ,该结构体类型是数组类型的元素类型:

-- column v previously has type ARRAY<STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table RENAME COLUMN v.element.f1 to f100;

将结构体类型中的嵌套列f1重命名为f100 ,该结构体类型是映射类型的值类型:

-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table RENAME COLUMN v.value.f1 to f100;

Dropping Columns

将表 my_table 中删除两列c1和c2:

ALTER TABLE my_table DROP COLUMNS (c1, c2);

从结构类型中删除嵌套列f2:

-- column v previously has type STRUCT<f1: STRING, f2: INT>
ALTER TABLE my_table DROP COLUMN v.f2;

从结构类型中删除嵌套列f2 ,该结构类型是数组类型的元素类型:

-- column v previously has type ARRAY<STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table DROP COLUMN v.element.f2;

从结构类型中删除嵌套列f2 ,该结构类型是映射类型的值类型:

-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table DROP COLUMN v.value.f2;

在hive catalog中,您需要确保:

  • 在Hive服务器中禁用 hive.metastore.disallow.incompatible.col.type.changes
  • 在spark中spark-sql --conf spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes=false

否则此操作可能失败,抛出类似 The following columns have types incompatible with the existing columns in their respective positions 的异常。

Dropping Partitions

删除paimon表的分区。对于spark sql,需要指定所有分区列:

ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');

Changing Column Comment

将列buy_count的注释更改为buy count:

ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';

Adding Column Position

添加列是设置位置:

ALTER TABLE my_table ADD COLUMN c INT FIRST;
ALTER TABLE my_table ADD COLUMN c INT AFTER b;

更改列位置:

ALTER TABLE my_table ALTER COLUMN col_a FIRST;
ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;

Changing Column Type

更改列类型:

ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;

ALTER DATABASE

在指定的数据库中设置一个或多个属性。如果数据库中已设置某个属性,则用新值覆盖旧值:

ALTER { DATABASE | SCHEMA | NAMESPACE } my_database
    SET { DBPROPERTIES | PROPERTIES } ( property_name = property_value [ , ... ] )

更改数据库Location:

ALTER DATABASE my_database SET LOCATION xxx

辅助语句

Set和Reset

SET命令用于设置属性、返回现有属性的值或返回所有包含其值和含义的 SQLConf 属性。

RESET命令用于将当前会话中通过SET命令设置的运行时配置重置为默认值。

要全局设置动态选项,需要添加spark.paimon. 前缀。您还可以使用以下格式设置动态表选项: spark.paimon.${catalogName}.${dbName}.${tableName}.${config_key} 。catalogName/dbName/tableName 可以为 * ,表示匹配所有指定部分。如果存在冲突,动态表选项将覆盖全局选项

-- set spark conf
SET spark.sql.sources.partitionOverwriteMode=dynamic;

-- set paimon conf
SET spark.paimon.file.block-size=512M;

-- reset conf
RESET spark.paimon.file.block-size;

-- set scan.snapshot-id=1 for the table default.T in any catalogs
SET spark.paimon.*.default.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=1 for the table T in any databases and catalogs
SET spark.paimon.*.*.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=2 for the table default.T1 in any catalogs and scan.snapshot-id=1 on other tables
SET spark.paimon.scan.snapshot-id=1;
SET spark.paimon.*.default.T1.scan.snapshot-id=2;
SELECT * FROM default.T1 JOIN default.T2 ON xxxx;

DataFrame

Paimon支持通过Spark DataFrame API创建表、插入数据和查询。

使用DataFrame API前,需要先use到创建的paimon catalog下,如:spark.sql("use paimon")。

可以使用 option 指定表属性,或者根据需要使用 partitionBy 设置分区列

import org.apache.spark.sql._  
val data: DataFrame = Seq((1, "x1", "p1"), (2, "x2", "p2")).toDF("a", "b", "pt")
spark.sql("use paimon")
data.write.format("paimon")
  .option("primary-key", "a,pt")
  .option("k1", "v1")
  .partitionBy("pt")
  .saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl")

Insert Into

可以通过将模式设置为 append 来实现 INSERT INTO 语义

import org.apache.spark.sql._ 
spark.sql("use paimon")
val data: DataFrame = ...
data.write.format("paimon")
  .mode("append")
  .insertInto("test_tbl") // or .saveAsTable("test_tbl") or .save("/path/to/default.db/test_tbl")

insertInto 忽略列名,仅使用基于位置的写入,如果需要按列名写入,请使用 saveAsTable 或 save 。

Insert Overwrite

可以通过将模式设置为使用 insertInto overwrite 来实现 INSERT OVERWRITE 语义

它支持对分区表进行动态分区覆盖。要启用动态覆盖,需要将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 。

import org.apache.spark.sql._ 
spark.sql("use paimon")
val data: DataFrame = ...

data.write.format("paimon")
  .mode("overwrite")
  .insertInto("test_tbl")

Replace Table

可以通过将模式设置为使用 saveAsTable 或 save overwrite 来实现 REPLACE TABLE 语义

import org.apache.spark.sql._ 
spark.sql("use paimon")
val data: DataFrame = ...
data.write.format("paimon")
  .option("primary-key", "a,pt")
  .option("k1", "v1")
  .partitionBy("pt")
  .mode("overwrite")
  .saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl")

Query

spark.sql("use paimon")
spark.read.format("paimon")
  .table("t") // or .load("/path/to/default.db/test_tbl")
  .show()

// recommend
spark.read.format("paimon")
  .table("<catalogName>.<databaseName>.<tableName>")

// or
spark.read.format("paimon")
  .option("catalog", "<catalogName>")
  .option("database", "<databaseName>")
  .option("table", "<tableName>")
  .load("/path/to/default.db/test_tbl")

// time travel
spark.read.format("paimon")
  .option("scan.snapshot-id", 1)
  .table("test_tbl")

Paimon适配Ranger鉴权

Paimon适配Ranger鉴权如下表所示:

SQL语句类型

SQL语句

Ranger权限

DDL

Create Table

Create

Create External Table

Create

Create Table As Select

Create/Select/Update

Create View

Create/Select

Replace View

Create/Select

Drop View

Drop

Create Tag

ALTER

Replace Tag

ALTER

Delete Tag

ALTER

Rename Tag

ALTER

Show Tags

SELECT

Write

Insert Into

UPDATE

Insert Overwrite

SELECT/UPDATE

Insert Overwrite Partition

SELECT/UPDATE

Dynamic Overwrite Partition

SELECT/UPDATE

Truncate Table

UPDATE

Update Table

UPDATE

Delete From Table

UPDATE

Merge Into Table

SELECT/UPDATE

Query

Batch Query

SELECT

Batch Time Travel

SELECT

Batch Incremental

SELECT

Alter

Changing Table Properties

ALTER

Adding Table Properties

ALTER

Removing Table Properties

ALTER

Changing Table Comment

ALTER

Adding Table Comment

ALTER

Removing Table Comment

ALTER

Rename Table Name

ALTER

Adding New Columns

ALTER

Renaming Column Name

ALTER

Dropping Columns

ALTER

Dropping Partitions

ALTER

Changing Column Comment

ALTER

Adding Column Position

ALTER

Changing Column Position

ALTER

Changing Column Type

ALTER

Altering Database Location

ALTER

Auxiliary Statements

Describe table

SELECT

Show create table

SELECT

Show columns

SELECT

Show partitions

SELECT

Show Table Extended Partition

SELECT

Analyze table

SELECT

Procedures/Call

compact

UPDATE

expire_snapshots

UPDATE

expire_partitions

UPDATE

create_tag

UPDATE

create_tag_from_timestamp

UPDATE

rename_tag

UPDATE

replace_tag

UPDATE

delete_tag

UPDATE

expire_tags

UPDATE

rollback

UPDATE

rollback_to_timestamp

UPDATE

rollback_to_watermark

UPDATE

purge_files

UPDATE

migrate_database

UPDATE

migrate_table

UPDATE/SELECT

remove_orphan_files

UPDATE

remove_unexisting_files

UPDATE

repair

UPDATE

create_branch

UPDATE

delete_branch

UPDATE

fast_forward

UPDATE

reset_consumer

UPDATE

clear_consumers

UPDATE

mark_partition_done

UPDATE

refresh_object_table

UPDATE

compact_manifest

UPDATE

alter_view_dialect

UPDATE

相关文档