Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using CDL/ Preparing for Creating a CDL Job/ Configuring the Source Data Heartbeat Table for Data Integrity Check

Configuring the Source Data Heartbeat Table for Data Integrity Check

Updated on 2024-10-09 GMT+08:00

Scenario

The heartbeat and data consistency check function is used to collect full-link information about CDL synchronization tasks, including the time required for sending data from the database management system RDBMS to Kafka, the time required for writing data from Kafka to Hudi, and the number of data records, and writes the data to a specific topic (cdl_snapshot_topic). You can consume the data in the topic and write the data to a specific Hudi table for data consistency check. The heartbeat data can be used not only to determine whether data before the heartbeat time has been synchronized to the data lake, but also to determine the data latency based on the transaction time, Kafka write time, data import start time, and data import end time.

In addition, for PgSQL tasks, configuring a heartbeat table can periodically push forward the LSN information recorded by the slot in the PgSQL database. This prevents database log stacking caused by the configuration of some tables with little changes in a task.

Configuring the Heartbeat Table for Capturing Data from Oracle GoldenGate (OGG) to a Hudi Job

  1. Run the following commands in the Oracle database where data needs to be synchronized to create a heartbeat table. The heartbeat table belongs to the CDC_CDL schema, the table name is CDC_HEARTBEAT, and the primary key is CDL_JOB_ID.

    CREATE TABLE "CDC_CDL"."CDC_HEARTBEAT" (

    "CDL_JOB_ID" VARCHAR(22) PRIMARY KEY,

    "CDL_LAST_HEARTBEAT" TIMESTAMP,

    SUPPLEMENTAL LOG DATA (ALL) COLUMNS

    );

  2. Add the CDC_HEARTBEAT table to the Oracle or OGG job to ensure that heartbeat data can be properly sent to Kafka.

    NOTE:

    For an Oracle job, go to 4.

  3. Configure the thirdparty-kafka (ogg) link on the CDL web UI and add the Oracle link information.

  4. After the configuration is complete, create a job for capturing data from OGG to Hudi on the CDL web UI and start the job to receive heartbeat data.

Configuring the Heartbeat Table for Capturing Data from PostgreSQL to a Hudi Job

  1. Run the following commands in the PostgreSQL database to be synchronized to create a heartbeat table. The heartbeat table belongs to the cdc_cdl schema, the table name is cdc_heartbeat, and the primary key is cdl_job_id.

    DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat;

    CREATE TABLE cdc_cdl.cdc_heartbeat (

    cdl_job_id int8 NOT NULL,

    cdl_last_heartbeat timestamp(6)

    );

    ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id);

  2. After the heartbeat table is created, create a job for capturing data from PostgreSQL to Hudi on the CDL web UI and start the job to receive heartbeat data.

Configuring the Heartbeat Table from openGauss to a Hudi Job

  1. Run the following commands in the openGauss database to be synchronized to create a heartbeat table. The heartbeat table belongs to the cdc_cdl schema, the table name is cdc_heartbeat, and the primary key is cdl_job_id.

    DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat;

    CREATE TABLE cdc_cdl.cdc_heartbeat (

    cdl_job_id int8 NOT NULL,

    cdl_last_heartbeat timestamp(6)

    );

    ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id);

  2. Add the heartbeat table to the DRS job to ensure that the heartbeat table data is properly sent to the DRS Kafka.
  3. On the CDL web UI, add the openGauss link information when configuring the thirdparty-kafka link of openGauss. If one primary openGauss node and multiple standby openGauss nodes are deployed, enter all IP addresses in Host.

  4. After the configuration is complete, create a job for capturing data from thirdparty-kafka to Hudi on the CDL web UI and start the job to receive heartbeat data.

Fields in a Data Consistency Check Message

Table 1 Fields in a data consistency check message

Field

Description

cdl_job_name

The name of the synchronization task to which the data in this batch belongs.

target_table_schema

The name of the schema to which the data in this batch is written.

target_table_name

The name of the Hudi table to which the data in this batch is written.

target_table_path

The path of the Hudi table to which the data in this batch is saved.

total_num

The total number of data records in this batch.

cdl_original_heartbeat

The maximum duration of heartbeat data in this batch. If this batch does not contain heartbeat data, the value is empty.

cdl_last_heartbeat

The minimum duration of heartbeat data in this batch. If this batch does not contain heartbeat data, the value of event_time_min is used.

insert_num

The total number of data insert events in this batch.

update_num

The total number of data update events in this batch.

delete_num

The total number of data delete events in this batch.

event_time_min

The minimum transaction submission time of the data source in this batch.

event_time_max

The maximum transaction submission time of the data source in this batch.

event_time_avg

The average transaction submission time of the data source in this batch.

kafka_timestamp_min

The minimum time for sending data in this batch to Kafka.

kafka_timestamp_max

The maximum time for sending data in this batch to Kafka.

begin_time

The time when the data in this batch starts to be written to Hudi.

end_time

The time when the data in this batch stops to be written to Hudi.

cdc_partitioned_time

The time partition field in the heartbeat table.

cdc_last_update_date

The time when the check record is written.

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback