配置源数据心跳表实现数据判齐功能
操作场景
心跳和数据判齐功能用于统计CDL同步任务的全链路信息, 包括从数据库管理系统RDBMS到Kafka的数据耗时、从Kafka消费数据写入到Hudi的数据耗时和数据条数等一系列信息,并将其写入到特定的Topic(cdl_snapshot_topic)中,用户可自行消费Topic中的数据并写入到某个特定Hudi表作数据判齐使用。心跳判齐数据不仅可以用来判断心跳时间之前的数据已经同步到数据湖,还可以根据事务时间,写Kafka的时间,数据开始入湖时间和数据入湖结束时间来判断数据时延问题。
同时对于PgSQL任务,配置心跳表可以定期向前推进PgSQL中Slot记录的LSN的信息,避免由于某个任务配置了某部分变化很小的表导致数据库日志积压。
配置从Oracle(ogg)抓取数据到Hudi任务的心跳表
- 在需要同步数据的Oracle数据库中执行以下命令创建一张心跳表,心跳表归属于CDC_CDL Schema,表名为CDC_HEARTBEAT,主键为CDL_JOB_ID:
    
    
CREATE TABLE "CDC_CDL"."CDC_HEARTBEAT" (
"CDL_JOB_ID" VARCHAR(22) PRIMARY KEY,
"CDL_LAST_HEARTBEAT" TIMESTAMP,
SUPPLEMENTAL LOG DATA (ALL) COLUMNS
);
 - 将CDC_HEARTBEAT表加入到Oracle或者ogg的任务中,确保心跳数据可以正常发送到Kafka。
 - 在CDL WebUI配置thirdparty-kafka(ogg)连接增加Oracle的连接信息。
    
    

 - 配置完成后,在CDL WebUI界面创建从Oracle(ogg)抓取数据到Hudi任务并启动即可收到心跳数据。
 
配置从Postgresql抓取数据到Hudi任务的心跳表
- 在需要同步的Postgresql数据库下执行以下命令创建一张心跳表,心跳表归属cdc_cdl Schema,表名为cdc_heartbeat,主键为cdl_job_id:
    
    
DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat;
CREATE TABLE cdc_cdl.cdc_heartbeat (
cdl_job_id int8 NOT NULL,
cdl_last_heartbeat timestamp(6)
);
ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id);
 - 心跳表创建完成后,在CDL WebUI界面创建从Postgresql抓取数据到Hudi的同步任务并启动即可收到心跳数据。
 
配置opengauss到Hudi任务的心跳表
- 在需要同步的opengauss数据库下执行以下命令创建一张心跳表,心跳表归属cdc_cdl Schema,表名为cdc_heartbeat,主键为cdl_job_id:
    
    
DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat;
CREATE TABLE cdc_cdl.cdc_heartbeat (
cdl_job_id int8 NOT NULL,
cdl_last_heartbeat timestamp(6)
);
ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id);
 - 将该心跳表加入到DRS任务,以确保心跳表数据正常发送到DRS Kafka。
 - 在CDL WebUI界面配置opengauss的thirdparty-kafka连接时增加opengauss的连接信息,如果opengauss部署为一主多备模式,需在“host”填写所有的IP。
    
    

 - 配置完成之后,在CDL WebUI界面创建从thirdparty-kafka抓取数据到Hudi的任务并启动即可收到心跳数据。
 
数据判齐消息字段含义
| 
        字段名  | 
      
        描述  | 
     
|---|---|
| 
        cdl_job_name  | 
      
        本批次数据所属同步任务名称  | 
     
| 
        target_table_schema  | 
      
        本批次数据写入Schema名称  | 
     
| 
        target_table_name  | 
      
        本批次数据写入Hudi表名称  | 
     
| 
        target_table_path  | 
      
        本批次数据保存的Hudi表路径  | 
     
| 
        total_num  | 
      
        本批次数据总数  | 
     
| 
        cdl_original_heartbeat  | 
      
        本批次数据中包含的心跳数据的最大时间, 如果本批次不包含心跳数据则值为空  | 
     
| 
        cdl_last_heartbeat  | 
      
        本批次数据中包含的心跳数据的最小时间,如果本批次不包含心跳数据则取“event_time_min”的值  | 
     
| 
        insert_num  | 
      
        本批次数据insert事件总数  | 
     
| 
        update_num  | 
      
        本批次数据update事件总数  | 
     
| 
        delete_num  | 
      
        本批次数据delete事件总数  | 
     
| 
        event_time_min  | 
      
        本批次数据源端最小事务提交时间  | 
     
| 
        event_time_max  | 
      
        本批次数据源端最大事务提交时间  | 
     
| 
        event_time_avg  | 
      
        本批次数据源端平均事务提交时间  | 
     
| 
        kafka_timestamp_min  | 
      
        本批次数据发送到Kafka的最小时间  | 
     
| 
        kafka_timestamp_max  | 
      
        本批次数据发送到Kafka的最大时间  | 
     
| 
        begin_time  | 
      
        本批次数据开始写入Hudi的时间  | 
     
| 
        end_time  | 
      
        本批次数据写入Hudi的结束时间  | 
     
| 
        cdc_partitioned_time  | 
      
        心跳表的时间分区字段  | 
     
| 
        cdc_last_update_date  | 
      
        该条判断记录写入时间  | 
     
    
      