CDL JSON和开源Debezium JSON数据格式介绍
数据格式介绍
在CDL数据同步任务中,不同的数据库会有不同的CDC(Change Data Capture)处理方式,为方便数据消费,在CDL中将这些数据库变更信息转化为统一的格式,再将数据库的变更信息写入MRS Kafka,最终写入Hudi。
目前仅当数据源为drs-opengauss-json、PgSQL和Opengauss时支持使用CDL JSON和开源Debezium JSON两种不同的数据格式,其他数据源仅支持CDL JSON数据格式。
CDL JSON和开源Debezium JSON两种数据格式均由payload和schema两部分组成,payload表示存储的具体的数据库变更数据;schema是payload的元数据,用来描述payload的数据结构,提供了详细的字段信息。
数据库数据变更示例
CDL JSON和开源Debezium JSON格式的数据变更涉及表1中的字段。
CDL JSON字段名称 |
开源Debezium JSON字段名称 |
参数说明 |
---|---|---|
message_version |
message_version |
待同步的JSON数据格式类型:
|
schema |
schema |
表示payload的数据结构。 |
payload |
payload |
待同步的数据库数据。 |
before |
before |
用于指定数据库变更发生前的状态。 |
data |
after |
用于指定数据库变更发生后的状态。 |
- |
source.version |
Debezium的版本。 |
DATA_STORE |
source.connector |
source连接器的类型。 |
- |
source.name |
source连接器的名称。 |
TIMESTAMP |
source.ts_ms |
用于指定数据库变更发生的时间戳。 |
- |
source.snapshot |
是否是快照的一部分,取值为“true”和“false”。 |
- |
source.db |
待同步数据的数据库名称。 |
SEG_OWNER |
source.schema |
待同步数据的数据库Schema名称。 |
TABLE_NAME |
source.table |
待同步数据的表名。 |
transaction.properties.lsn |
source.lsn |
日志序号。 |
transaction.properties.txId |
source.txId |
事务ID。 |
OPERATION |
op |
数据操作类型,其中:
|
unique |
unique |
主键 |
- |
ts_ms |
source连接器处理事件的时间戳。 |
LOB_COLUMNS |
LOB_COLUMNS |
大对象数据类型的字段名称。 |
HEARTBEAT_IDENTIFIER |
HEARTBEAT_IDENTIFIER |
心跳标识符。 |
以下为CDL JSON和开源Debezium JSON格式的数据库变更信息举例,其中type表示当前字段的类型,field表示当前字段名,fields表示当前字段所包含的字段数组,optional表示当前字段是否可以为null,name表示当前字段的别名。
- 例如,一个CDL JSON格式的数据库变更信息如下:
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"DATA_STORE"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"value"}],"optional":false},"optional":false,"field":"properties"}],"optional":false,"name":"transaction","field":"transaction"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"data"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"HEARTBEAT_IDENTIFIER"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"DATA_STORE":"POSTGRESQL","SEG_OWNER":"public","TABLE_NAME":"ct_pg2hudi","TIMESTAMP":1707047996013,"OPERATION":"INSERT","LOB_COLUMNS":null,"transaction":{"properties":[{"name":"lsn","value":163955221008},{"name":"txId","value":57227595}]},"unique":{"id":34},"data":{"count1":13,"id":34,"time1":null,"decimalNum":null},"before":null,"message_version":"1.0","message_type":"0","HEARTBEAT_IDENTIFIER":"279fb050-0143-45c1-b184-50bc48c2461c"}}
- 例如,一个Debezium JSON格式的数据库变更信息如下:
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"before":null,"after":{"count1":14,"id":35,"time1":null,"decimalNum":null},"source":{"version":"1.4.0.Final","connector":"postgresql","name":"cdl","ts_ms":1707048891235,"snapshot":"false","db":"cdl","schema":"public","table":"ct_pg2hudi","txId":57227663,"lsn":163955586912},"op":"c","ts_ms":1707048984208,"message_version":"2.0","message_type":"0","LOB_COLUMNS":null,"unique":{"id":35}}}