CDL JSON and Open-Source Debezium JSON
Data Formats
In a CDL data synchronization task, databases use different Change Data Capture (CDC) modes. To facilitate data consumption, database change information is converted into a unified format in CDL data synchronization tasks, and then written to MRS Kafka and Hudi.
Currently, CDL JSON and open-source Debezium JSON are supported only when the data source is DRS-openGauss-JSON, PostgreSQL, or openGauss. For other data sources, only CDL JSON is supported.
Both CDL JSON and open-source Debezium JSON consist of payload and schema, respectively. Payload indicates the stored database change data. Schema is the metadata of payload and provides detailed field information.
Example Database Data Changes
Data changes in CDL JSON and open-source Debezium JSON formats involve fields listed in Table 1.
CDL JSON Field |
Open-Source Debezium JSON Field |
Description |
---|---|---|
message_version |
message_version |
Format of the data to be synchronized. The options are as follows:
|
schema |
schema |
Payload data structure |
payload |
payload |
Database data to be synchronized |
before |
before |
Status of the database before the change |
data |
after |
Status of the database after the change |
- |
source.version |
Debezium version |
DATA_STORE |
source.connector |
Type of the source connector |
- |
source.name |
Name of the source connector |
TIMESTAMP |
source.ts_ms |
Timestamp when a database change occurs |
- |
source.snapshot |
Whether the change is a part of the snapshot. The value can be true or false. |
- |
source.db |
Name of the database to be synchronized |
SEG_OWNER |
source.schema |
Schema of the database to be synchronized |
TABLE_NAME |
source.table |
Table to be synchronized |
transaction.properties.lsn |
source.lsn |
Log serial number |
transaction.properties.txId |
source.txId |
Transaction ID |
OPERATION |
op |
Data operation type. The options are as follows:
|
unique |
unique |
Primary key |
- |
ts_ms |
Timestamp when the source connector processes an event |
LOB_COLUMNS |
LOB_COLUMNS |
Field name of large object data |
HEARTBEAT_IDENTIFIER |
HEARTBEAT_IDENTIFIER |
Heartbeat identifier |
The following is example database change information in CDL JSON and open-source Debezium JSON formats. type indicates the type of the field, field indicates the name of the field, fields indicates the array of fields contained in the field, optional indicates whether the field can be null, and name indicates the alias of the field.
- A sample in CDL JSON format is as follows:
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"DATA_STORE"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"value"}],"optional":false},"optional":false,"field":"properties"}],"optional":false,"name":"transaction","field":"transaction"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"data"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"HEARTBEAT_IDENTIFIER"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"DATA_STORE":"POSTGRESQL","SEG_OWNER":"public","TABLE_NAME":"ct_pg2hudi","TIMESTAMP":1707047996013,"OPERATION":"INSERT","LOB_COLUMNS":null,"transaction":{"properties":[{"name":"lsn","value":163955221008},{"name":"txId","value":57227595}]},"unique":{"id":34},"data":{"count1":13,"id":34,"time1":null,"decimalNum":null},"before":null,"message_version":"1.0","message_type":"0","HEARTBEAT_IDENTIFIER":"279fb050-0143-45c1-b184-50bc48c2461c"}}
- A sample in Debezium JSON format is as follows:
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"before":null,"after":{"count1":14,"id":35,"time1":null,"decimalNum":null},"source":{"version":"1.4.0.Final","connector":"postgresql","name":"cdl","ts_ms":1707048891235,"snapshot":"false","db":"cdl","schema":"public","table":"ct_pg2hudi","txId":57227663,"lsn":163955586912},"op":"c","ts_ms":1707048984208,"message_version":"2.0","message_type":"0","LOB_COLUMNS":null,"unique":{"id":35}}}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot