Hudi系统函数
Hudi系统函数仅8.2.1.100及以上版本支持。
pg_show_custom_settings()
描述:查询Hudi外表参数设置详情。
返回值类型:setof record
示例:
1 2 3 4 5 6 7 8 9 10 11 12 |
SELECT * FROM pg_show_custom_settings(); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline ----------------------------------------------------+-------------------+------+--------------------+--------------------------+------------+---------+---------+---------+---------+---------+ ----------+----------+-----------+------------+------------ hoodie.public.hudi_mor_ft.consume.ending.timestamp | 20230404172329544 | | Customized Options | GUC placeholder variable | | user | string | session | | | | | | | hoodie.public.hudi_mor_ft.consume.mode | incremental | | Customized Options | GUC placeholder variable | | user | string | session | | | | | | | hoodie.public.hudi_mor_ft.consume.start.timestamp | 20230404172329543 | | Customized Options | GUC placeholder variable | | user | string | session | | | | | | | (3 rows) |
hudi_get_options(regclass)
描述:查询Hudi外表的属性信息(hoodie.properties)。以key-value键值对表示。
返回值类型:setof record
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
select * from hudi_get_options('public.hudi_mor_ft'); key | value -------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------- hoodie.table.precombine.field | col_int hoodie.datasource.write.drop.partition.columns | false hoodie.table.partition.fields | hoodie.table.type | MERGE_ON_READ hoodie.archivelog.folder | archived hoodie.compaction.payload.class | org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.timeline.layout.version | 1 hoodie.table.version | 4 hoodie.table.recordkey.fields | col_bigint hoodie.database.name | default hoodie.datasource.write.partitionpath.urlencode | false hoodie.table.name | lt_test_mor_014 hoodie.table.keygenerator.class | org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.write.hive_style_partitioning | true hoodie.table.create.schema | {"type"\:"record","name"\:"lt_test_mor_014_record","namespace"\:"hoodie.lt_test_mor_014","fields"\:[{"name"\:"_hoodie_commit_time","type"\:[ "string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null "]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"col_bigint","type"\:["long","null"]},{"name"\:"col_int","type"\:["int","null"]},{"name"\:"col_text","type"\:["string","nu ll"]},{"name"\:"col_text2","type"\:["string","null"]}]} hoodie.table.checksum | 515660817 (16 rows) |
hudi_get_max_commit(regclass)
描述:获取当前Hudi外表最新commit的时间戳和数据写入时间。
返回值类型:record
示例:
1 2 3 4 5 |
SELECT * FROM hudi_get_max_commit('public.hudi_mor_ft'); max_commit | write_time ----------------+------------------------ 20221207141822 | 2022-12-07 14:18:30+08 (1 row) |
hudi_get_commit(regclass, cstring, int)
描述:获取当前Hudi外表从指定commit开始,到第N个commit的时间戳和对应commit数据写入时间。当第N个commit不存在,则返回最新的commit和对应数据写入时间。该函数仅9.1.0.100及以上版本支持。
返回值类型:record
示例:
1 2 3 4 5 |
SELECT * FROM hudi_get_commit('public.hudi_mor_ft', '20230329174744657', 3); end_commit | write_time -------------------+------------------------ 20230329174808908 | 2023-08-31 15:43:08+08 (1 row) |
hudi_sync_task_submit(regclass, regclass)
描述:提交Hudi自动同步任务。第一个入参为同步目标表,第二个入参为HUDI外表。提交任务成功会返回任务的task-id。
返回值类型:text
- 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
- 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
- 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
- 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。
示例:
1 2 3 4 5 |
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft'); hudi_sync_task_submit -------------------------------------- 6465efe2-3ea1-0b00-dde5-b57dfb30fffe (1 row) |
hudi_sync_task_submit(regclass, regclass, interval)
描述:功能与hudi_sync_task_submit(regclass, regclass)一致。不同之处在于,用户可以额外指定一个interval类型入参,用于指定任务的调度周期,取值范围为5秒至24小时。提交任务成功会返回任务的task-id。该函数仅8.2.1.300及以上版本支持。
返回值类型:text
- 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
- 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
- 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
- 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。
示例:
1 2 3 4 5 |
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft','1 hour'); hudi_sync_task_submit -------------------------------------- 6465efe2-3ea1-0b00-dde5-b57dfb30fffe (1 row) |
hudi_sync_task_submit(regclass, regclass, text, text)
描述:功能与hudi_sync_task_submit(regclass, regclass)一致,不同之处在于,用户可以额外指定两个text入参,表示用户期望哪些字段被同步。字段之间使用','分隔,支持引号和转义字符的解析。两个text参数的字段数量和顺序应当一致,表示同步字段之间的对应关系。提交任务成功会返回任务的task-id。
返回值类型:text
- 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
- 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
- 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
- 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。
示例:
1 2 3 4 5 |
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft','_hoodie_commit_time, col_bigint, col_text', '_hoodie_commit_time, col_bigint, col_text'); hudi_sync_task_submit -------------------------------------- 646610bc-cdd1-0d00-d07d-b57e89a0fffe (1 row) |
hudi_sync_task_submit(regclass, regclass, text, text, interval)
描述:功能与hudi_sync_task_submit(regclass, regclass, text, text)一致,不同之处在于,用户可以额外指定一个interval类型入参,用于指定任务的调度周期,取值范围为5秒至24小时。该函数仅8.2.1.300及以上版本支持。
返回值类型:text
- 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
- 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
- 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
- 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。
示例:
1 2 3 4 5 |
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft','_hoodie_commit_time, col_bigint, col_text', '_hoodie_commit_time, col_bigint, col_text', '10 minute 30second'); hudi_sync_task_submit -------------------------------------- 646610bc-cdd1-0d00-d07d-b57e89a0fffe (1 row) |
hudi_show_sync_state()
描述:获取Hudi自动同步任务的同步状态。
返回值类型:setof record
示例:
1 2 3 4 5 |
SELECT * FROM hudi_show_sync_state(); target_tbl | source_ftbl | payload_type | precombine_key | latest_commit --------------------+--------------------+-------------------------------------------------------------+----------------+------------------- public.hudi_sync_i | public.hudi_mor_ft | org.apache.hudi.common.model.OverwriteWithLatestAvroPayload | col_int | 20230511114021573 (1 row) |
hudi_sync(regclass, regclass)
描述:存储过程,Hudi自动同步任务调用入口。使用pg_catalog.hudi_sync_task_submit(regclass, regclass)提交的任务会执行该存储过程。执行成功会提示同步的行数和时间戳。
返回值类型:text
示例:
1 2 3 4 5 6 7 |
CALL hudi_sync('public.hudi_sync_i', 'public.hudi_mor_ft'); NOTICE: execute full sync CONTEXT: PL/pgSQL function hudi_sync(regclass,regclass) line 11 at RETURN hudi_sync -------------------------------------- sync 1 rows up to 20230511114021573. (1 row) |
hudi_sync_custom(regclass, regclass, text)
描述:存储过程,Hudi自动同步任务调用入口,支持用户自定义目标表和数据源表的字段同步对应关系。使用pg_catalog.hudi_sync_task_submit(regclass, regclass, text, text)提交的任务会执行该存储过程。其中text为json风格字符串,表示两张表字段间的同步对应关系。执行成功会提示同步的行数和时间戳。
返回值类型:text
示例:
1 2 3 4 5 6 7 |
CALL hudi_sync_custom('public.hudi_sync_i', 'public.hudi_mor_ft', '{"_hoodie_commit_time" : "_hoodie_commit_time", "col_bigint" : "col_bigint", "col_text" : "col_text"}'); NOTICE: execute full sync CONTEXT: PL/pgSQL function hudi_sync_custom(regclass,regclass,text) line 14 at RETURN hudi_sync_custom -------------------------------------- sync 1 rows up to 20230511114021573. (1 row) |
hudi_set_sync_commit(regclass, regclass, text)
描述:设置Hudi自动同步任务首次同步的起点时间戳,避免在已经同步了部分数据的情况下,重新同步已有数据。第一个参数为同步目标表,第二个参数为hudi外表,第三个参数为用户期望的同步起点。该函数需要在提交同步任务之前使用。该函数仅8.2.1.210及以上版本支持。
返回值类型:text
示例:
1 2 3 4 5 6 7 |
select hudi_set_sync_commit('public.hudi_sync_i', 'public.hudi_mor_ft', '20230511114021573'); NOTICE: set sync commit successfully, the next synchronization will start from 20230511114021573 CONTEXT: referenced column: hudi_set_sync_commit hudi_set_sync_commit ---------------------- 20230511114021573 (1 row) |
用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常设置同步进度。
hudi_set_sync_commit(text, text)
描述:设置Hudi自动同步任务下一次同步的起点时间戳,可以用于重复同步历史数据或者跳过某些数据。第一个参数为任务id,第二个参数为用户期望的下一次同步起点。该函数需要在提交同步任务后使用,并且使用前需要暂停任务。该函数仅8.2.1.210及以上版本支持。
返回值类型:text
示例:
1 2 3 4 5 6 7 |
select hudi_set_sync_commit('6524c8e3-aae9-0000-5a14-be8ec000fffe', '20230511114021573'); NOTICE: set sync commit successfully, the next synchronization will start from 20230511114021573 CONTEXT: referenced column: hudi_set_sync_commit hudi_set_sync_commit ---------------------- 20230511114021573 (1 row) |
- 只有拥有目标任务权限的用户才能成功调用该函数。
- 调用该函数前,目标任务需要处于暂停状态,且至少成功执行过一次。
pg_task_show(text)
描述:查询当前自动调度任务信息,对于hudi同步任务,入参应该是'SQLonHudi'。
返回值类型:setof record
示例:
1 2 3 4 5 |
SELECT * FROM pg_task_show('SQLonHudi'); task_id | what | category_id | userid | is_broken | interval | time_cons | start_time | end_time | parameter | task_rank | next_start_time | next_end_time | last_log | failure_times --------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------+-----------+------------+-----------+-------------------------------+----------+-----------+-----------+-------------------------------+-------------------------------+----------+--------------- 64d257e9-1e9b-0d00-3ce3-7e61b5e0fffe | call pg_catalog.hudi_sync_custom('public.hudi_read_target', 'public.hudi_read101', '{"_hoodie_commit_seqno" : "_hoodie_commit_seqno", "id" : "id", "ts" : "ts", "long_field" : "ts"}'); | SQLonHudi | 10 | f | '00:00:10' | | 2023-08-08 22:58:15.846903+08 | | | 5 | 2023-08-08 22:58:15.846903+08 | 2023-08-08 22:58:24.846903+08 | | 0 (1 row) |
last_log和failure_times字段用于记录上一次任务的状态。
- last_log于任务结束时刷新,若任务成功,则清空内容;若任务失败则记录任务的失败日志。
- failure_times于时间窗结束时刷新,若任务成功,置为0,失败则累加1,本轮未拉起则不变,可用于推断第一次失败出现的时间。
pg_task_remove(text)
描述:删除某个自动调度任务,入参为该任务的task-id,函数返回被删除任务的个数。
返回值类型:integer
示例:
1 2 3 4 5 |
SELECT pg_task_remove('64661705-8ada-0100-d07f-b57e89a0fffe'); pg_task_remove ---------------- 1 (1 row) |
pg_task_pause(text)
描述:暂停某个自动调度任务,入参为该任务的task-id,函数返回被暂停任务的个数。
返回值类型:integer
示例:
1 2 3 4 5 |
SELECT pg_task_pause('64661705-8ada-0100-d07f-b57e89a0fffe'); pg_task_pause --------------- 1 (1 row) |
pg_task_resume(text)
描述:恢复某个自动调度任务执行,入参为该任务的task-id。函数返回被恢复执行任务的个数。该函数仅8.2.1.300及以上版本支持。
返回值类型:integer
示例:
1 2 3 4 5 |
SELECT pg_task_resume('64661705-8ada-0100-d07f-b57e89a0fffe'); pg_task_resume ---------------- 1 (1 row) |
pg_task_reset_interval(text, interval)
描述:修改某个同步任务的调度周期,第一个入参为任务的task_id,第二个入用于指定任务的调度周期,取值范围为5秒至24小时。函数返回被修改周期的任务的个数。该函数仅8.2.1.300及以上版本支持。
返回值类型:integer
示例:
1 2 3 4 5 |
select pg_task_reset_interval('64bfd69c-a016-0000-120e-1e802978fffe', '10 hours 30 minutes'); pg_task_reset_interval ------------------------ 1 (1 row) |