配置Spark对接Paimon
操作场景
本章节指导用户使用Spark对接Paimon提交作业。
约束与限制
- 如果需要使用HetuEngine查询Hive客户端创建的Paimon表,表必须创建在单独的database下(即database的Location必须为“warehouse路径 + database名称 + .db后缀”,即“warehouse路径/database名称.db”),不能使用default database,否则HetuEngine查询会报错提示表不存在。
前提条件
- 集群中已安装HDFS、Zookeeper、Yarn、Spark、Hive组件。
- 已安装集群的Spark客户端。
使用Spark对接Paimon提交作业
- 使用客户端安装用户登录Spark客户端节点,进入客户端安装目录。
cd 客户端安装目录
加载环境变量:
source bigdata_env
加载组件环境变量:
source Spark/component_env
安全模式执行以下命令,普通模式无需执行:
kinit 组件业务用户
输入密码完成认证(首次登录需要修改密码)
- 获取paimon依赖包的完整路径。
cd {客户端安装路径}/Spark/spark/jars/paimon/ ll
不同版本paimon-spark和paimon-hive-connector-common的版本号不同:
-rwxr-x---. 1 root root 45M Mar 14 08:56 paimon-hive-connector-common-{具体版本号}.jar
-rwxr-x---. 1 root root 41M Mar 14 08:56 paimon-spark-3.5-{具体版本号}.jar
- 执行如下命令,使用spark-sql对接paimon:
spark-sql \ --master yarn \ --jars {客户端安装路径}/Spark/spark/jars/paimon/{paimon-spark完整包名},{客户端安装路径}/Spark/spark/jars/paimon/{paimon-hive-connector-common完整包名} \ --driver-class-path {客户端安装路径}/Spark/spark/jars/paimon/{paimon-spark完整包名}:{客户端安装路径}/Spark/spark/jars/paimon/{paimon-hive-connector-common完整包名} \ --conf spark.sql.catalog.{paimon_catalog_name}=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.{paimon_catalog_name}.warehouse=${HIVE_METASTORE_WAREHOUSE_DIR} \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \ --conf spark.sql.catalog.{paimon_catalog_name}.metastore=hive \ --conf spark.sql.catalog.{paimon_catalog_name}.uri=${HIVE_METASTORE_URI_DEFAULT}
- 所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。
USE {paimon_catalog_name}; - 如果涉及Spark写Paimon表,Hive读取,涉及Timestamp类型读写时,只支持TIMESTAMP_NTZ类型的Timestamp跨引擎读写,Spark创建表前需将spark.sql.timestampType参数设置为TIMESTAMP_NTZ。
- 如果涉及Spark读取其他引擎写的Paimon表的Timestamp类型字段,需要将spark.sql.session.timeZone设置为与写Paimon表的引擎的时区配置保持一致,否则可能出现结果不一致的情况,如果写入引擎使用的TIMESTAMP_NTZ类型的Timestamp,需将spark.sql.timestampType参数设置为TIMESTAMP_NTZ。
- 注意--jars和--driver-class-path后路径的分隔符不同,分别是“,”和“:”。
使用说明:
- “${HIVE_METASTORE_URI_DEFAULT}”参数值获取方式:
登录Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > MetaStore(角色)”,搜索参数HIVE_METASTORE_URI_DEFAULT,即可获取参数值。
- "${HIVE_METASTORE_WAREHOUSE_DIR}"参数获取方式:
登录Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > MetaStore(角色)”,搜索参数hive.metastore.warehouse.dir,即可获取参数值。
- spark-submit和spark-shell作业提交方式与spark-sql一致。
- spark-beeline场景,需要将客户端下的“{客户端安装路径}/Spark/spark/jars/paimon/paimon-spark-3.5*.jar”和“{客户端安装路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar”上传到HDFS或OBS统一归档路径。
登录Manager界面,选择“集群 > 服务 > Spark > 配置 > 全部配置 > JDBCServer(角色) ”,在“custom”中添加表1配置,其次按表2修改配置项,然后重启JDBCserver。
表1 添加到custom中的配置项 名称
值
spark.jars
{hdfs或obs统一归档路径}/paimon-spark-3.5*.jar,{hdfs或obs统一归档路径}/paimon-hive-connector-common*.jar
spark.sql.catalog.{paimon_catalog_name}
org.apache.paimon.spark.SparkCatalog
spark.sql.catalog.{paimon_catalog_name}.warehouse
${HIVE_METASTORE_WAREHOUSE_DIR}
spark.sql.catalog.{paimon_catalog_name}.metastore
hive
spark.sql.catalog.{paimon_catalog_name}.uri
${HIVE_METASTORE_URI_DEFAULT}
表2 修改的配置项 名称
默认值值
修改后的值
spark.sql.extensions
org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension
org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
- 如果Paimon表功能与Hudi功能冲突时,请删除spark.sql.extensions中的org.apache.spark.sql.hudi.HoodieSparkSessionExtension值。
- spark.shuffle.manager需要保持默认值sort。
- 管理面提交作业场景,需要将客户端下的“{客户端安装路径}/Spark/spark/jars/paimon/paimon-spark-3.5*.jar”和“{客户端安装路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar”上传到HDFS或OBS统一归档路径,在提交作业时指定以下配置:
- --jas {hdfs或obs统一归档路径}/paimon-spark-3.5*.jar,{hdfs或obs统一归档路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar
- --conf spark.sql.catalog.{paimon_catalog_name}=org.apache.paimon.spark.SparkCatalog
- --conf spark.sql.catalog.{paimon_catalog_name}.warehouse=${HIVE_METASTORE_WAREHOUSE_DIR}
- --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
- --conf spark.sql.catalog.{paimon_catalog_name}.metastore=hive
- --conf spark.sql.catalog.{paimon_catalog_name}.uri=${HIVE_METASTORE_URI_DEFAULT}
所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。
USE {paimon_catalog_name};
- 所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。
Spark和Paimon数据类型转换
| Spark Data Type | Paimon Data Type | Atomic Type |
|---|---|---|
| StructType | RowType | false |
| MapType | MapType | false |
| ArrayType | ArrayType | false |
| BooleanType | BooleanType | true |
| ByteType | TinyIntType | true |
| ShortType | SmallIntType | true |
| IntegerType | IntType | true |
| LongType | BigIntType | true |
| FloatType | FloatType | true |
| DoubleType | DoubleType | true |
| StringType | VarCharType(Integer.MAX_VALUE) | true |
| VarCharType(length) | VarCharType(length) | true |
| CharType(length) | CharType(length) | true |
| DateType | DateType | true |
| TimestampType | LocalZonedTimestamp | true |
| TimestampNTZType | TimestampType | true |
| DecimalType(precision, scale) | DecimalType(precision, scale) | true |
| BinaryType | VarBinaryType, BinaryType | true |
SQL DDL
按照spark on paimon作业提交命令启动后,会默认创建一个catalog,可以使用以下SQL 切换到catalog {paimon_catalog_name}的指定数据库{database_name}:
USE {paimon_catalog_name}.{database_name};
所有对Paimon表的操作都要先USE到创建的paimon catalog下才能执行。
Create Table
- 不支持CREATE TABLE LIKE语法。
- 如果需要使用HetuEngine查询Hive客户端创建的Paimon表,表必须创建在单独的database下(即database的Location必须为“warehouse路径 + database名称 + .db后缀”),不能使用default database,否则HetuEngine查询会报错提示表不存在。
创建一个有五列的表 my_table ,其中 dt、hh和user_id是主键的表:
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
建分区表:
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
); CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
) LOCATION '/path/to/table'; Create Table As Select
使用CREATE TABLE AS SELECT时可以指定主键或者分区,语法请参考如下SQL:
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;
/* partitioned table*/
CREATE TABLE my_table_partition (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;
/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
user_id BIGINT,
item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;
/* primary key */
CREATE TABLE my_table_pk (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;
/* primary key + partition */
CREATE TABLE my_table_all (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all; Create Or Replace View
CREATE VIEW构建一个没有物理数据的虚拟表:
-- create a view. CREATE VIEW v1 AS SELECT * FROM t1; -- create a view, if a view of same name already exists, it will be replaced. CREATE OR REPLACE VIEW v1 AS SELECT * FROM t1;
Drop View
DROP VIEW从目录中删除与指定视图相关的元数据:
DROP VIEW v1;
Create Or Replace Tag
使用以下选项创建或替换标签语法:
- 创建带有或不带有快照ID和时间保留的标签。
- 如果使用IF NOT EXISTS语法,创建已存在的标签不会失败。
- 使用REPLACE TAG或CREATE OR REPLACE TAG语法更新标签:
-- create a tag based on the latest snapshot and no retention. ALTER TABLE T CREATE TAG `TAG-1`; -- create a tag based on the latest snapshot and no retention if it doesn't exist. ALTER TABLE T CREATE TAG IF NOT EXISTS `TAG-1`; -- create a tag based on the latest snapshot and retain it for 7 day. ALTER TABLE T CREATE TAG `TAG-2` RETAIN 7 DAYS; -- create a tag based on snapshot-1 and no retention. ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1; -- create a tag based on snapshot-2 and retain it for 12 hour. ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS; -- replace an existing tag with new snapshot id and new retention ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS; -- create or replace a tag, create tag if it not exist, replace tag if it exists. ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
如果设置了 tag.automatic-creation,则每个快照只能创建一个自动标签。
Delete Tag
删除表的一个或多个标签:
-- delete a tag. ALTER TABLE T DELETE TAG `TAG-1`; -- delete a tag if it exists. ALTER TABLE T DELETE TAG IF EXISTS `TAG-1` -- delete multiple tags, delimiter is ','. ALTER TABLE T DELETE TAG `TAG-1,TAG-2`;
Rename Tag
使用新标签名称重命名现有标签:
ALTER TABLE T RENAME TAG `TAG-1` TO `TAG-2`;
Show Tags
列出表的所有标签:
SHOW TAGS T;
SQL写入
INSERT语句用于向表中插入新行或覆盖表中的现有数据。插入的行可以通过值表达式或查询结果指定。
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }; Insert Into
使用INSERT INTO将记录和更改应用到表:
INSERT INTO my_table SELECT ...
Insert Overwrite
使用INSERT OVERWRITE覆盖分区:
INSERT OVERWRITE my_table SELECT ...
Dynamic Overwrite Partition
将Spark配置spark.sql.sources.partitionOverwriteMode 设置为dynamic:
CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt); INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2'); -- Static overwrite (Overwrite the whole table) INSERT OVERWRITE my_table VALUES (3, 'p1'); -- or INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1'); SELECT * FROM my_table; /* +---+---+ | id| pt| +---+---+ | 3| p1| +---+---+ */ -- Static overwrite with specified partitions (Only overwrite pt='p1') INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3); SELECT * FROM my_table; /* +---+---+ | id| pt| +---+---+ | 2| p2| | 3| p1| +---+---+ */ -- Dynamic overwrite (Only overwrite pt='p1') SET spark.sql.sources.partitionOverwriteMode=dynamic; INSERT OVERWRITE my_table VALUES (3, 'p1'); SELECT * FROM my_table; /* +---+---+ | id| pt| +---+---+ | 2| p2| | 3| p1| +---+---+ */
Truncate Table
TRUNCATE TABLE语句从表或分区中删除所有行:
TRUNCATE TABLE my_table;
Paimon添加了防呆设计,当执行Truncate操作时,不会真正删除数据,只是给数据做了标记,等数据过期后才会删除。
当snapshot.time-retained与snapshot.num-retained.min同时满足时,Paimon 写入器在提交新更改时会自动执行过期操作。
Update Table
更新与谓词匹配的行的列值。如果未提供谓词,则更新所有行的列值。
当目标表为主键表时,不支持更新主键列。
Spark 支持更新PrimitiveType和StructType:
-- Syntax UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition; CREATE TABLE t ( id INT, s STRUCT<c1: INT, c2: STRING>, name STRING) TBLPROPERTIES ( 'primary-key' = 'id', 'merge-engine' = 'deduplicate' ); -- you can use UPDATE t SET name = 'a_new' WHERE id = 1; UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
Delete From Table
删除与谓词匹配的行。如果未提供谓词,则删除所有行:
DELETE FROM my_table WHERE currency = 'UNKNOWN';
Merge Into Table
将基于源表的一组更新、插入和删除操作合并到目标表中。在更新子句中,当目标表是主键表时,不支持更新主键列。
MERGE INTO target USING source ON target.a = source.a WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
Write Merge Schema:不支持该语法。
SQL查询
Paimon的批量读取会返回表快照中的所有数据。默认情况下,批量读取会返回最新的快照。
-- read all columns SELECT * FROM t;
Paimon 还支持读取一些隐藏的元数据列,目前支持以下列:
- __paimon_file_path :记录的文件路径。
- __paimon_partition :记录的分区。
- __paimon_bucket :记录的存储桶。
- __paimon_row_index :记录的行索引。
- _ROW_ID :记录的唯一行 ID(仅当 row-tracking.enabled 设置为 true 时有效)。
- _SEQUENCE_NUMBER :记录的序列号(仅当 row-tracking.enabled 设置为 true 时有效)。
SELECT *, __paimon_file_path, __paimon_partition, __paimon_bucket, __paimon_row_index FROM t;
Batch Time Travel
Paimon批量读取具有时间旅行功能,可以指定快照或标签并读取相应的数据。可以在查询中使用 VERSION AS OF 和 TIMESTAMP AS OF 来进行时间旅行:
查询表的快照信息:
desc `my_table$snapshots`; /* snapshot_id bigint schema_id bigint commit_user string commit_identifier bigint commit_kind string commit_time timestamp_ntz base_manifest_list string delta_manifest_list string changelog_manifest_list string total_record_count bigint delta_record_count bigint changelog_record_count bigint watermark bigint */ SELECT * FROM `my_table$snapshots`;
查询表快照信息时,反引号不能少,如:`table_name$snapshots`。
-- read the snapshot with id 1L (use snapshot id as version) SELECT * FROM t VERSION AS OF 1; -- read the snapshot from specified timestamp SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123'; -- read the snapshot from specified timestamp in unix seconds SELECT * FROM t TIMESTAMP AS OF 1678883047; -- read tag 'my-tag' SELECT * FROM t VERSION AS OF 'my-tag'; -- read the snapshot from specified watermark. will match the first snapshot after the watermark SELECT * FROM t VERSION AS OF 'watermark-1678883047356';
如果标签名称为数字且等于快照 ID,则 VERSION AS OF 语法会优先考虑标签。例如,如果您有一个名为“1”的标签,且该标签基于快照 2,则语句 SELECT * FROM t VERSION AS OF '1' 实际上会查询快照 2,而不是快照 1。
Batch Incremental
读取开始快照(独占)和结束快照之间的增量变化。
默认情况下,将扫描生成更改日志文件的表的更改日志文件。否则,将扫描新更改的文件。您也可以强制指定 'incremental-between-scan-mode' 。
Paimon支持在Spark SQL中使用函数paimon_incremental_query、paimon_incremental_between_timestamp、paimon_incremental_to_auto_tag进行Spark Table Valued Function实现的增量查询。
-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);
-- read the incremental data between ts 1692169900000 and ts 1692169900000.
SELECT * FROM paimon_incremental_between_timestamp('tableName', '1692169000000', '1692169900000');
SELECT * FROM paimon_incremental_between_timestamp('tableName', '2025-03-12 00:00:00', '2025-03-12 00:08:00');
-- read the incremental data to tag '2024-12-04'.
-- Paimon will find an earlier tag and return changes between them.
-- If the tag doesn't exist or the earlier tag doesn't exist, return empty.
SELECT * FROM paimon_incremental_to_auto_tag('tableName', '2024-12-04');
批量 SQL 不允许返回 DELETE 记录,因此 -D 的记录会被丢弃。如果要查看 DELETE 记录,可以查询 audit_log 表
Query Optimization
强烈建议在查询时指定分区和主键过滤器,这将加快查询的数据跳过速度。
可以加速数据跳过的过滤函数有:
- =
- <
- <=
- >
- >=
- IN (...)
- LIKE 'abc%'
- IS NULL
例如定义一个表:
CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
.....,
) TBLPROPERTIES (
'primary-key' = 'catalog_id,order_id'
); 通过为主键最左边的前缀指定范围过滤器,查询获得了良好的加速:
SELECT * FROM orders WHERE catalog_id=1025; SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495; SELECT * FROM orders WHERE catalog_id=1025 AND order_id>2035 AND order_id<6000;
但是下面的过滤器不能很好地加速查询。
SELECT * FROM orders WHERE order_id=29495; SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
SQL修改表属性
Changing/Adding Table Properties
将write-buffer-size表属性设置为256 MB:
ALTER TABLE my_table SET TBLPROPERTIES (
'write-buffer-size' = '256 MB'
); Removing Table Properties
删除write-buffer-size表属性:
ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size'); Changing/Adding Table Comment
将表my_table的注释更改为table comment:
ALTER TABLE my_table SET TBLPROPERTIES (
'comment' = 'table comment'
); Removing Table Comment
删除表注释:
ALTER TABLE my_table UNSET TBLPROPERTIES ('comment'); Rename Table Name
将表名重命名为新名称:
ALTER TABLE my_table RENAME TO my_table_new;
Adding New Columns
将表my_table添加两列c1和c2:
ALTER TABLE my_table ADD COLUMNS (
c1 INT,
c2 STRING
); 向结构类型添加嵌套列f3:
-- column v previously has type STRUCT<f1: STRING, f2: INT> ALTER TABLE my_table ADD COLUMN v.f3 STRING;
向结构体类型添加嵌套列f3 ,该结构体类型是array类型的元素类型:
-- column v previously has type ARRAY<STRUCT<f1: STRING, f2: INT>> ALTER TABLE my_table ADD COLUMN v.element.f3 STRING;
向结构类型添加嵌套列f3 ,该结构类型是map类型的值类型:
-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>> ALTER TABLE my_table ADD COLUMN v.value.f3 STRING;
Renaming Column Name
将表my_table中的列c0重命名为c1:
ALTER TABLE my_table RENAME COLUMN c0 TO c1;
将结构类型中的嵌套列f1重命名为f100:
-- column v previously has type STRUCT<f1: STRING, f2: INT> ALTER TABLE my_table RENAME COLUMN v.f1 to f100;
将结构体类型中的嵌套列f1重命名为f100 ,该结构体类型是数组类型的元素类型:
-- column v previously has type ARRAY<STRUCT<f1: STRING, f2: INT>> ALTER TABLE my_table RENAME COLUMN v.element.f1 to f100;
将结构体类型中的嵌套列f1重命名为f100 ,该结构体类型是映射类型的值类型:
-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>> ALTER TABLE my_table RENAME COLUMN v.value.f1 to f100;
Dropping Columns
将表 my_table 中删除两列c1和c2:
ALTER TABLE my_table DROP COLUMNS (c1, c2);
从结构类型中删除嵌套列f2:
-- column v previously has type STRUCT<f1: STRING, f2: INT> ALTER TABLE my_table DROP COLUMN v.f2;
从结构类型中删除嵌套列f2 ,该结构类型是数组类型的元素类型:
-- column v previously has type ARRAY<STRUCT<f1: STRING, f2: INT>> ALTER TABLE my_table DROP COLUMN v.element.f2;
从结构类型中删除嵌套列f2 ,该结构类型是映射类型的值类型:
-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>> ALTER TABLE my_table DROP COLUMN v.value.f2;
在hive catalog中,您需要确保:
- 在Hive服务器中禁用 hive.metastore.disallow.incompatible.col.type.changes
- 在spark中spark-sql --conf spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes=false
否则此操作可能失败,抛出类似 The following columns have types incompatible with the existing columns in their respective positions 的异常。
Dropping Partitions
删除paimon表的分区。对于spark sql,需要指定所有分区列:
ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');
Changing Column Comment
将列buy_count的注释更改为buy count:
ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';
Adding Column Position
添加列是设置位置:
ALTER TABLE my_table ADD COLUMN c INT FIRST; ALTER TABLE my_table ADD COLUMN c INT AFTER b;
更改列位置:
ALTER TABLE my_table ALTER COLUMN col_a FIRST; ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
Changing Column Type
更改列类型:
ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;
ALTER DATABASE
在指定的数据库中设置一个或多个属性。如果数据库中已设置某个属性,则用新值覆盖旧值:
ALTER { DATABASE | SCHEMA | NAMESPACE } my_database
SET { DBPROPERTIES | PROPERTIES } ( property_name = property_value [ , ... ] ) 更改数据库Location:
ALTER DATABASE my_database SET LOCATION xxx
辅助语句
Set和Reset
SET命令用于设置属性、返回现有属性的值或返回所有包含其值和含义的 SQLConf 属性。
RESET命令用于将当前会话中通过SET命令设置的运行时配置重置为默认值。
要全局设置动态选项,需要添加spark.paimon. 前缀。您还可以使用以下格式设置动态表选项: spark.paimon.${catalogName}.${dbName}.${tableName}.${config_key} 。catalogName/dbName/tableName 可以为 * ,表示匹配所有指定部分。如果存在冲突,动态表选项将覆盖全局选项
-- set spark conf SET spark.sql.sources.partitionOverwriteMode=dynamic; -- set paimon conf SET spark.paimon.file.block-size=512M; -- reset conf RESET spark.paimon.file.block-size; -- set scan.snapshot-id=1 for the table default.T in any catalogs SET spark.paimon.*.default.T.scan.snapshot-id=1; SELECT * FROM default.T; -- set scan.snapshot-id=1 for the table T in any databases and catalogs SET spark.paimon.*.*.T.scan.snapshot-id=1; SELECT * FROM default.T; -- set scan.snapshot-id=2 for the table default.T1 in any catalogs and scan.snapshot-id=1 on other tables SET spark.paimon.scan.snapshot-id=1; SET spark.paimon.*.default.T1.scan.snapshot-id=2; SELECT * FROM default.T1 JOIN default.T2 ON xxxx;
DataFrame
Paimon支持通过Spark DataFrame API创建表、插入数据和查询。
使用DataFrame API前,需要先use到创建的paimon catalog下,如:spark.sql("use paimon")。
可以使用 option 指定表属性,或者根据需要使用 partitionBy 设置分区列
import org.apache.spark.sql._
val data: DataFrame = Seq((1, "x1", "p1"), (2, "x2", "p2")).toDF("a", "b", "pt")
spark.sql("use paimon")
data.write.format("paimon")
.option("primary-key", "a,pt")
.option("k1", "v1")
.partitionBy("pt")
.saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl")
Insert Into
可以通过将模式设置为 append 来实现 INSERT INTO 语义
import org.apache.spark.sql._
spark.sql("use paimon")
val data: DataFrame = ...
data.write.format("paimon")
.mode("append")
.insertInto("test_tbl") // or .saveAsTable("test_tbl") or .save("/path/to/default.db/test_tbl")
insertInto 忽略列名,仅使用基于位置的写入,如果需要按列名写入,请使用 saveAsTable 或 save 。
Insert Overwrite
可以通过将模式设置为使用 insertInto overwrite 来实现 INSERT OVERWRITE 语义
它支持对分区表进行动态分区覆盖。要启用动态覆盖,需要将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 。
import org.apache.spark.sql._
spark.sql("use paimon")
val data: DataFrame = ...
data.write.format("paimon")
.mode("overwrite")
.insertInto("test_tbl") Replace Table
可以通过将模式设置为使用 saveAsTable 或 save overwrite 来实现 REPLACE TABLE 语义
import org.apache.spark.sql._
spark.sql("use paimon")
val data: DataFrame = ...
data.write.format("paimon")
.option("primary-key", "a,pt")
.option("k1", "v1")
.partitionBy("pt")
.mode("overwrite")
.saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl") Query
spark.sql("use paimon")
spark.read.format("paimon")
.table("t") // or .load("/path/to/default.db/test_tbl")
.show()
// recommend
spark.read.format("paimon")
.table("<catalogName>.<databaseName>.<tableName>")
// or
spark.read.format("paimon")
.option("catalog", "<catalogName>")
.option("database", "<databaseName>")
.option("table", "<tableName>")
.load("/path/to/default.db/test_tbl")
// time travel
spark.read.format("paimon")
.option("scan.snapshot-id", 1)
.table("test_tbl") Paimon适配Ranger鉴权
Paimon适配Ranger鉴权如下表所示:
| SQL语句类型 | SQL语句 | Ranger权限 |
|---|---|---|
| DDL | Create Table | Create |
| Create External Table | Create | |
| Create Table As Select | Create/Select/Update | |
| Create View | Create/Select | |
| Replace View | Create/Select | |
| Drop View | Drop | |
| Create Tag | ALTER | |
| Replace Tag | ALTER | |
| Delete Tag | ALTER | |
| Rename Tag | ALTER | |
| Show Tags | SELECT | |
| Write | Insert Into | UPDATE |
| Insert Overwrite | SELECT/UPDATE | |
| Insert Overwrite Partition | SELECT/UPDATE | |
| Dynamic Overwrite Partition | SELECT/UPDATE | |
| Truncate Table | UPDATE | |
| Update Table | UPDATE | |
| Delete From Table | UPDATE | |
| Merge Into Table | SELECT/UPDATE | |
| Query | Batch Query | SELECT |
| Batch Time Travel | SELECT | |
| Batch Incremental | SELECT | |
| Alter | Changing Table Properties | ALTER |
| Adding Table Properties | ALTER | |
| Removing Table Properties | ALTER | |
| Changing Table Comment | ALTER | |
| Adding Table Comment | ALTER | |
| Removing Table Comment | ALTER | |
| Rename Table Name | ALTER | |
| Adding New Columns | ALTER | |
| Renaming Column Name | ALTER | |
| Dropping Columns | ALTER | |
| Dropping Partitions | ALTER | |
| Changing Column Comment | ALTER | |
| Adding Column Position | ALTER | |
| Changing Column Position | ALTER | |
| Changing Column Type | ALTER | |
| Altering Database Location | ALTER | |
| Auxiliary Statements | Describe table | SELECT |
| Show create table | SELECT | |
| Show columns | SELECT | |
| Show partitions | SELECT | |
| Show Table Extended Partition | SELECT | |
| Analyze table | SELECT | |
| Procedures/Call | compact | UPDATE |
| expire_snapshots | UPDATE | |
| expire_partitions | UPDATE | |
| create_tag | UPDATE | |
| create_tag_from_timestamp | UPDATE | |
| rename_tag | UPDATE | |
| replace_tag | UPDATE | |
| delete_tag | UPDATE | |
| expire_tags | UPDATE | |
| rollback | UPDATE | |
| rollback_to_timestamp | UPDATE | |
| rollback_to_watermark | UPDATE | |
| purge_files | UPDATE | |
| migrate_database | UPDATE | |
| migrate_table | UPDATE/SELECT | |
| remove_orphan_files | UPDATE | |
| remove_unexisting_files | UPDATE | |
| repair | UPDATE | |
| create_branch | UPDATE | |
| delete_branch | UPDATE | |
| fast_forward | UPDATE | |
| reset_consumer | UPDATE | |
| clear_consumers | UPDATE | |
| mark_partition_done | UPDATE | |
| refresh_object_table | UPDATE | |
| compact_manifest | UPDATE | |
| alter_view_dialect | UPDATE |