Updated on 2024-11-29 GMT+08:00

CDL JSON and Open-Source Debezium JSON

Data Formats

In a CDL data synchronization task, databases use different Change Data Capture (CDC) modes. To facilitate data consumption, database change information is converted into a unified format in CDL data synchronization tasks, and then written to MRS Kafka and Hudi.

Currently, CDL JSON and open-source Debezium JSON are supported only when the data source is DRS-openGauss-JSON, PostgreSQL, or openGauss. For other data sources, only CDL JSON is supported.

Both CDL JSON and open-source Debezium JSON consist of payload and schema, respectively. Payload indicates the stored database change data. Schema is the metadata of payload and provides detailed field information.

Example Database Data Changes

Data changes in CDL JSON and open-source Debezium JSON formats involve fields listed in Table 1.

Table 1 Data format changes

CDL JSON Field

Open-Source Debezium JSON Field

Description

message_version

message_version

Format of the data to be synchronized. The options are as follows:

  • 1.0: CDL JSON
  • 2.0: open-source Debezium JSON

schema

schema

Payload data structure

payload

payload

Database data to be synchronized

before

before

Status of the database before the change

data

after

Status of the database after the change

-

source.version

Debezium version

DATA_STORE

source.connector

Type of the source connector

-

source.name

Name of the source connector

TIMESTAMP

source.ts_ms

Timestamp when a database change occurs

-

source.snapshot

Whether the change is a part of the snapshot. The value can be true or false.

-

source.db

Name of the database to be synchronized

SEG_OWNER

source.schema

Schema of the database to be synchronized

TABLE_NAME

source.table

Table to be synchronized

transaction.properties.lsn

source.lsn

Log serial number

transaction.properties.txId

source.txId

Transaction ID

OPERATION

op

Data operation type. The options are as follows:

  • CDL JSON data supports the following operations:
    • INSERT
    • UPDATE
    • DELETE
  • Debezium JSON data supports the following operations:
    • c: insert
    • u: update
    • d: delete

unique

unique

Primary key

-

ts_ms

Timestamp when the source connector processes an event

LOB_COLUMNS

LOB_COLUMNS

Field name of large object data

HEARTBEAT_IDENTIFIER

HEARTBEAT_IDENTIFIER

Heartbeat identifier

The following is example database change information in CDL JSON and open-source Debezium JSON formats. type indicates the type of the field, field indicates the name of the field, fields indicates the array of fields contained in the field, optional indicates whether the field can be null, and name indicates the alias of the field.

  • A sample in CDL JSON format is as follows:
    {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"DATA_STORE"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"value"}],"optional":false},"optional":false,"field":"properties"}],"optional":false,"name":"transaction","field":"transaction"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"data"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"HEARTBEAT_IDENTIFIER"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"DATA_STORE":"POSTGRESQL","SEG_OWNER":"public","TABLE_NAME":"ct_pg2hudi","TIMESTAMP":1707047996013,"OPERATION":"INSERT","LOB_COLUMNS":null,"transaction":{"properties":[{"name":"lsn","value":163955221008},{"name":"txId","value":57227595}]},"unique":{"id":34},"data":{"count1":13,"id":34,"time1":null,"decimalNum":null},"before":null,"message_version":"1.0","message_type":"0","HEARTBEAT_IDENTIFIER":"279fb050-0143-45c1-b184-50bc48c2461c"}}
  • A sample in Debezium JSON format is as follows:
    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"before":null,"after":{"count1":14,"id":35,"time1":null,"decimalNum":null},"source":{"version":"1.4.0.Final","connector":"postgresql","name":"cdl","ts_ms":1707048891235,"snapshot":"false","db":"cdl","schema":"public","table":"ct_pg2hudi","txId":57227663,"lsn":163955586912},"op":"c","ts_ms":1707048984208,"message_version":"2.0","message_type":"0","LOB_COLUMNS":null,"unique":{"id":35}}}