Hudi任务同步
创建Hudi任务
迁移场景
如果DWS表已经通过CDL导入数据,改为用SQL on Hudi方式迁移数据。或者使用CDM做全量初始化后,继续使用SQL on Hudi方式同步增量数据。
- 创建hudi.hudi_sync_state同步状态表,需要管理员权限。
     
     
1SELECT pg_catalog.create_hudi_sync_table();
通常情况下,每个数据库中只创建一次hudi.hudi_sync_state。
 - 设置同步进度,用户需要有同步目标表的INSERT和UPDATE权限、HUDI外表的SELECT权限,否则无法正常设置同步进度。
     
     
1SELECT hudi_set_sync_commit('SCHEMA.TABLE', 'SCHEMA.FOREIGN_TABLE', 'LATEST_COMMIT');
其中:
- SCHEMA.TABLE,表示同步数据的目标表名,schema为模式名。
 - SCHEMA.FOREIGN_TABLE,表示OBS外表名,schema为模式名。
 - LATEST_COMMIT,表示已同步的Hudi数据时间截点。
 
示例:目标表public.in_rel,已经同步hudi的数据到20220913152131,切换到SQL on Hudi方式从OBS外表hudi_read1中继续导出数据。
1SELECT hudi_set_sync_commit('public.in_rel', 'public.hudi_read1', '20220913152131');
 - 提交Hudi同步任务。
     
     
1SELECT hudi_sync_task_submit('SCHEMA.TABLE', 'SCHEMA.FOREIGN_TABLE');
例如:目标表public.in_rel,切换到SQL on Hudi方式从OBS外表hudi_read1中继续导出数据。
1SELECT hudi_sync_task_submit('public.in_rel', 'public.hudi_read1');
 
新建场景
DWS表为空,第一次从Hudi同步数据,可执行以下命令直接创建任务。
        1
         | 
       
        SELECT hudi_sync_task_submit('SCHEMA.TABLE', 'SCHEMA.FOREIGN_TABLE');  | 
      
查询Hudi同步任务
查询Hudi同步任务。
语法:
         1
          | 
        
         SELECT * FROM pg_task_show('SQLonHudi');  | 
       
查询结果中的task_id是Hudi同步任务唯一标识。
示例:
         1 2 3 4 5  | 
        
         SELECT * FROM pg_task_show('SQLonHudi'); task_id | what | category_id | userid | is_broken | interval | time_cons | start_time | end_time | parameter | task_rank | next_start_time | next_end_time | last_log | failure_times --------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------+-----------+------------+-----------+-------------------------------+----------+-----------+-----------+-------------------------------+-------------------------------+----------+--------------- 64d257e9-1e9b-0d00-3ce3-7e61b5e0fffe | call pg_catalog.hudi_sync_custom('public.hudi_read_target', 'public.hudi_read101', '{"_hoodie_commit_seqno" : "_hoodie_commit_seqno", "id" : "id", "ts" : "ts", "long_field" : "ts"}'); | SQLonHudi | 10 | f | '00:00:10' | | 2023-08-08 22:58:15.846903+08 | | | 5 | 2023-08-08 22:58:15.846903+08 | 2023-08-08 22:58:24.846903+08 | | 0 (1 row)  | 
       
暂停Hudi同步任务
通过指定的task_id,暂停对应的Hudi同步任务。
语法:
         1
          | 
        
         SELECT pg_task_pause('task_id');  | 
       
示例:
暂停task_id为64479410-a04c-0700-d150-3037d700fffe的同步任务。
         1
          | 
        
         SELECT pg_task_pause('64479410-a04c-0700-d150-3037d700fffe');  | 
       
恢复Hudi同步任务
通过指定的task_id,恢复对应的Hudi同步任务。
语法:
         1
          | 
        
         SELECT pg_task_resume('task_id');  | 
       
示例:
恢复task_id为64479410-a04c-0700-d150-3037d700fffe的同步任务。
         1
          | 
        
         SELECT pg_task_resume('64479410-a04c-0700-d150-3037d700fffe');  | 
       
复位连续失败的Hudi同步任务
通过指定连续失败Hudi任务的task_id,复位连续失败的Hudi同步任务。
连续失败次数大于等于10的任务会自动暂停,需手动调用pg_task_resume()函数复位。
         1
          | 
        
         SELECT pg_task_resume('task_id');  | 
       
删除Hudi同步任务
通过指定的task_id,删除对应的Hudi同步任务。
语法:
         1
          | 
        
         SELECT pg_task_remove('task_id');  | 
       
示例:
删除task_id为64479410-a04c-0700-d150-3037d700fffe的同步任务。
         1
          | 
        
         SELECT pg_task_remove('64479410-a04c-0700-d150-3037d700fffe');  | 
       
查询历史Hudi同步任务信息
使用视图hudi_sync_state_history_view查询Hudi历史同步任务信息,该视图仅9.1.0及以上集群版本支持。
         1
          | 
        
         SELECT * FROM pg_catalog.hudi_sync_state_history_view;  | 
       
| 
        名称  | 
      
        类型  | 
      
        描述  | 
     
|---|---|---|
| 
        task_id  | 
      
        text  | 
      
        任务ID。  | 
     
| 
        target_tbl  | 
      
        text  | 
      
        同步目标表名。  | 
     
| 
        source_ftbl  | 
      
        text  | 
      
        同步源表名(外表)。  | 
     
| 
        latest_commit  | 
      
        text  | 
      
        最近一次同步成功的时间戳。  | 
     
| 
        latest_sync_count  | 
      
        bigint  | 
      
        最近一次同步成功的行数。  | 
     
| 
        latest_sync_start  | 
      
        timestamp with time zone  | 
      
        最近一次同步任务开始的时间。  | 
     
| 
        latest_sync_end  | 
      
        timestamp with time zone  | 
      
        最近一次同步任务结束的时间。  | 
     
| 
        hudi_flushdisk_time  | 
      
        text  | 
      
        hudi文件落盘时间。  | 
     
查询Hudi同步任务状态
使用函数hudi_show_sync_state()查询Hudi同步任务状态。
         1
          | 
        
         SELECT * FROM hudi_show_sync_state();  |