更新时间:2024-12-06 GMT+08:00
分享

Hudi系统函数

Hudi系统函数仅8.2.1.100及以上版本支持。

pg_show_custom_settings()

描述:查询Hudi外表参数设置详情。

返回值类型:setof record

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
SELECT * FROM pg_show_custom_settings();
                        name                        |      setting      | unit |      category      |        short_desc        | extra_desc | context | vartype | source  | min_val | max_val |
 enumvals | boot_val | reset_val | sourcefile | sourceline
----------------------------------------------------+-------------------+------+--------------------+--------------------------+------------+---------+---------+---------+---------+---------+
----------+----------+-----------+------------+------------
 hoodie.public.hudi_mor_ft.consume.ending.timestamp | 20230404172329544 |      | Customized Options | GUC placeholder variable |            | user    | string  | session |         |         |
          |          |           |            |
 hoodie.public.hudi_mor_ft.consume.mode             | incremental       |      | Customized Options | GUC placeholder variable |            | user    | string  | session |         |         |
          |          |           |            |
 hoodie.public.hudi_mor_ft.consume.start.timestamp  | 20230404172329543 |      | Customized Options | GUC placeholder variable |            | user    | string  | session |         |         |
          |          |           |            |
(3 rows)

hudi_get_options(regclass)

描述:查询Hudi外表的属性信息(hoodie.properties)。以key-value键值对表示。

返回值类型:setof record

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
select * from hudi_get_options('public.hudi_mor_ft');
                       key                       |
                                                                                                                                                  value


-------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------
 hoodie.table.precombine.field                   | col_int
 hoodie.datasource.write.drop.partition.columns  | false
 hoodie.table.partition.fields                   |
 hoodie.table.type                               | MERGE_ON_READ
 hoodie.archivelog.folder                        | archived
 hoodie.compaction.payload.class                 | org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
 hoodie.timeline.layout.version                  | 1
 hoodie.table.version                            | 4
 hoodie.table.recordkey.fields                   | col_bigint
 hoodie.database.name                            | default
 hoodie.datasource.write.partitionpath.urlencode | false
 hoodie.table.name                               | lt_test_mor_014
 hoodie.table.keygenerator.class                 | org.apache.hudi.keygen.ComplexKeyGenerator
 hoodie.datasource.write.hive_style_partitioning | true
 hoodie.table.create.schema                      | {"type"\:"record","name"\:"lt_test_mor_014_record","namespace"\:"hoodie.lt_test_mor_014","fields"\:[{"name"\:"_hoodie_commit_time","type"\:[
"string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null
"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"col_bigint","type"\:["long","null"]},{"name"\:"col_int","type"\:["int","null"]},{"name"\:"col_text","type"\:["string","nu
ll"]},{"name"\:"col_text2","type"\:["string","null"]}]}
 hoodie.table.checksum                           | 515660817
(16 rows)

hudi_get_max_commit(regclass)

描述:获取当前Hudi外表最新commit的时间戳和数据写入时间。

返回值类型:record

示例:

1
2
3
4
5
SELECT * FROM hudi_get_max_commit('public.hudi_mor_ft');
   max_commit   |       write_time
----------------+------------------------
 20221207141822 | 2022-12-07 14:18:30+08
(1 row)

hudi_get_commit(regclass, cstring, int)

描述:获取当前Hudi外表从指定commit开始,到第N个commit的时间戳和对应commit数据写入时间。当第N个commit不存在,则返回最新的commit和对应数据写入时间。该函数仅9.1.0.100及以上版本支持。

返回值类型:record

示例:

1
2
3
4
5
SELECT * FROM hudi_get_commit('public.hudi_mor_ft', '20230329174744657', 3);
    end_commit     |       write_time       
-------------------+------------------------
 20230329174808908 | 2023-08-31 15:43:08+08
(1 row)

hudi_sync_task_submit(regclass, regclass)

描述:提交Hudi自动同步任务。第一个入参为同步目标表,第二个入参为HUDI外表。提交任务成功会返回任务的task-id。

返回值类型:text

  • 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
  • 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
  • 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
  • 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。

示例:

1
2
3
4
5
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft');
        hudi_sync_task_submit
--------------------------------------
 6465efe2-3ea1-0b00-dde5-b57dfb30fffe
(1 row)

hudi_sync_task_submit(regclass, regclass, interval)

描述:功能与hudi_sync_task_submit(regclass, regclass)一致。不同之处在于,用户可以额外指定一个interval类型入参,用于指定任务的调度周期,取值范围为5秒至24小时。提交任务成功会返回任务的task-id。该函数仅8.2.1.300及以上版本支持。

返回值类型:text

  • 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
  • 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
  • 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
  • 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。

示例:

1
2
3
4
5
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft','1 hour');
        hudi_sync_task_submit
--------------------------------------
 6465efe2-3ea1-0b00-dde5-b57dfb30fffe
(1 row)

hudi_sync_task_submit(regclass, regclass, text, text)

描述:功能与hudi_sync_task_submit(regclass, regclass)一致,不同之处在于,用户可以额外指定两个text入参,表示用户期望哪些字段被同步。字段之间使用','分隔,支持引号和转义字符的解析。两个text参数的字段数量和顺序应当一致,表示同步字段之间的对应关系。提交任务成功会返回任务的task-id。

返回值类型:text

  • 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
  • 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
  • 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
  • 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。

示例:

1
2
3
4
5
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft','_hoodie_commit_time, col_bigint, col_text', '_hoodie_commit_time, col_bigint, col_text');
        hudi_sync_task_submit
--------------------------------------
 646610bc-cdd1-0d00-d07d-b57e89a0fffe
(1 row)

hudi_sync_task_submit(regclass, regclass, text, text, interval)

描述:功能与hudi_sync_task_submit(regclass, regclass, text, text)一致,不同之处在于,用户可以额外指定一个interval类型入参,用于指定任务的调度周期,取值范围为5秒至24小时。该函数仅8.2.1.300及以上版本支持。

返回值类型:text

  • 同步目标表必须包含主键,且主键需要与hudi recordkey一致。
  • 如果hudi表包含precombine字段,那么同步目标表也必须包含与之对应的字段。
  • 如果同步目标表只包含主键(除了主键外没有其他字段),则无法正常提交同步任务。
  • 用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常提交同步任务。

示例:

1
2
3
4
5
SELECT hudi_sync_task_submit('public.hudi_sync_i','public.hudi_mor_ft','_hoodie_commit_time, col_bigint, col_text', '_hoodie_commit_time, col_bigint, col_text', '10 minute 30second');
        hudi_sync_task_submit
--------------------------------------
 646610bc-cdd1-0d00-d07d-b57e89a0fffe
(1 row)

hudi_show_sync_state()

描述:获取Hudi自动同步任务的同步状态。

返回值类型:setof record

示例:

1
2
3
4
5
SELECT * FROM hudi_show_sync_state();
     target_tbl     |    source_ftbl     |                        payload_type                         | precombine_key |   latest_commit
--------------------+--------------------+-------------------------------------------------------------+----------------+-------------------
 public.hudi_sync_i | public.hudi_mor_ft | org.apache.hudi.common.model.OverwriteWithLatestAvroPayload | col_int        | 20230511114021573
(1 row)

hudi_sync(regclass, regclass)

描述:存储过程,Hudi自动同步任务调用入口。使用pg_catalog.hudi_sync_task_submit(regclass, regclass)提交的任务会执行该存储过程。执行成功会提示同步的行数和时间戳。

返回值类型:text

示例:

1
2
3
4
5
6
7
CALL hudi_sync('public.hudi_sync_i', 'public.hudi_mor_ft');
NOTICE:  execute full sync
CONTEXT:  PL/pgSQL function hudi_sync(regclass,regclass) line 11 at RETURN
              hudi_sync
--------------------------------------
 sync 1 rows up to 20230511114021573.
(1 row)

hudi_sync_custom(regclass, regclass, text)

描述:存储过程,Hudi自动同步任务调用入口,支持用户自定义目标表和数据源表的字段同步对应关系。使用pg_catalog.hudi_sync_task_submit(regclass, regclass, text, text)提交的任务会执行该存储过程。其中text为json风格字符串,表示两张表字段间的同步对应关系。执行成功会提示同步的行数和时间戳。

返回值类型:text

示例:

1
2
3
4
5
6
7
CALL hudi_sync_custom('public.hudi_sync_i', 'public.hudi_mor_ft', '{"_hoodie_commit_time" : "_hoodie_commit_time", "col_bigint" : "col_bigint", "col_text" : "col_text"}');
NOTICE:  execute full sync
CONTEXT:  PL/pgSQL function hudi_sync_custom(regclass,regclass,text) line 14 at RETURN
           hudi_sync_custom
--------------------------------------
 sync 1 rows up to 20230511114021573.
(1 row)

hudi_set_sync_commit(regclass, regclass, text)

描述:设置Hudi自动同步任务首次同步的起点时间戳,避免在已经同步了部分数据的情况下,重新同步已有数据。第一个参数为同步目标表,第二个参数为hudi外表,第三个参数为用户期望的同步起点。该函数需要在提交同步任务之前使用。该函数仅8.2.1.210及以上版本支持。

返回值类型:text

示例:

1
2
3
4
5
6
7
select hudi_set_sync_commit('public.hudi_sync_i', 'public.hudi_mor_ft', '20230511114021573');
NOTICE:  set sync commit successfully, the next synchronization will start from 20230511114021573
CONTEXT:  referenced column: hudi_set_sync_commit
 hudi_set_sync_commit
----------------------
 20230511114021573
(1 row)

用户需要有同步目标表的insert和update权限、HUDI外表的select权限,否则无法正常设置同步进度。

hudi_set_sync_commit(text, text)

描述:设置Hudi自动同步任务下一次同步的起点时间戳,可以用于重复同步历史数据或者跳过某些数据。第一个参数为任务id,第二个参数为用户期望的下一次同步起点。该函数需要在提交同步任务后使用,并且使用前需要暂停任务。该函数仅8.2.1.210及以上版本支持。

返回值类型:text

示例:

1
2
3
4
5
6
7
select hudi_set_sync_commit('6524c8e3-aae9-0000-5a14-be8ec000fffe', '20230511114021573');
NOTICE:  set sync commit successfully, the next synchronization will start from 20230511114021573
CONTEXT:  referenced column: hudi_set_sync_commit
 hudi_set_sync_commit
----------------------
 20230511114021573
(1 row)
  • 只有拥有目标任务权限的用户才能成功调用该函数。
  • 调用该函数前,目标任务需要处于暂停状态,且至少成功执行过一次。

pg_task_show(text)

描述:查询当前自动调度任务信息,对于hudi同步任务,入参应该是'SQLonHudi'。

返回值类型:setof record

示例:

1
2
3
4
5
SELECT * FROM pg_task_show('SQLonHudi');
              task_id                |                                                                                          what                                                                                           | category_id | userid | is_broken |  interval  | time_cons |          start_time           | end_time | parameter | task_rank |        next_start_time        |         next_end_time         | last_log | failure_times 
--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------+-----------+------------+-----------+-------------------------------+----------+-----------+-----------+-------------------------------+-------------------------------+----------+---------------
 64d257e9-1e9b-0d00-3ce3-7e61b5e0fffe | call pg_catalog.hudi_sync_custom('public.hudi_read_target', 'public.hudi_read101', '{"_hoodie_commit_seqno" : "_hoodie_commit_seqno", "id" : "id", "ts" : "ts", "long_field" : "ts"}'); | SQLonHudi   |     10 | f         | '00:00:10' |           | 2023-08-08 22:58:15.846903+08 |          |           |         5 | 2023-08-08 22:58:15.846903+08 | 2023-08-08 22:58:24.846903+08 |          |             0
(1 row)

last_log和failure_times字段用于记录上一次任务的状态。

  • last_log于任务结束时刷新,若任务成功,则清空内容;若任务失败则记录任务的失败日志。
  • failure_times于时间窗结束时刷新,若任务成功,置为0,失败则累加1,本轮未拉起则不变,可用于推断第一次失败出现的时间。

pg_task_remove(text)

描述:删除某个自动调度任务,入参为该任务的task-id,函数返回被删除任务的个数。

返回值类型:integer

示例:

1
2
3
4
5
SELECT pg_task_remove('64661705-8ada-0100-d07f-b57e89a0fffe');
 pg_task_remove
----------------
              1
(1 row)

pg_task_pause(text)

描述:暂停某个自动调度任务,入参为该任务的task-id,函数返回被暂停任务的个数。

返回值类型:integer

示例:

1
2
3
4
5
SELECT pg_task_pause('64661705-8ada-0100-d07f-b57e89a0fffe');
 pg_task_pause
---------------
             1
(1 row)

pg_task_resume(text)

描述:恢复某个自动调度任务执行,入参为该任务的task-id。函数返回被恢复执行任务的个数。该函数仅8.2.1.300及以上版本支持。

返回值类型:integer

示例:

1
2
3
4
5
SELECT pg_task_resume('64661705-8ada-0100-d07f-b57e89a0fffe');
 pg_task_resume
----------------
              1
(1 row)

pg_task_reset_interval(text, interval)

描述:修改某个同步任务的调度周期,第一个入参为任务的task_id,第二个入用于指定任务的调度周期,取值范围为5秒至24小时。函数返回被修改周期的任务的个数。该函数仅8.2.1.300及以上版本支持。

返回值类型:integer

示例:

1
2
3
4
5
select pg_task_reset_interval('64bfd69c-a016-0000-120e-1e802978fffe', '10 hours 30 minutes');
pg_task_reset_interval
------------------------
1
(1 row)

相关文档