更新时间:2024-11-29 GMT+08:00

CDL JSON和开源Debezium JSON数据格式介绍

数据格式介绍

在CDL数据同步任务中,不同的数据库会有不同的CDC(Change Data Capture)处理方式,为方便数据消费,在CDL中将这些数据库变更信息转化为统一的格式,再将数据库的变更信息写入MRS Kafka,最终写入Hudi。

目前仅当数据源为drs-opengauss-json、PgSQL和Opengauss时支持使用CDL JSON和开源Debezium JSON两种不同的数据格式,其他数据源仅支持CDL JSON数据格式。

CDL JSON和开源Debezium JSON两种数据格式均由payload和schema两部分组成,payload表示存储的具体的数据库变更数据;schema是payload的元数据,用来描述payload的数据结构,提供了详细的字段信息。

数据库数据变更示例

CDL JSON和开源Debezium JSON格式的数据变更涉及表1中的字段。

表1 数据格式变更字段

CDL JSON字段名称

开源Debezium JSON字段名称

参数说明

message_version

message_version

待同步的JSON数据格式类型:

  • 1.0:CDL JSON格式
  • 2.0:开源Debezium JSON格式

schema

schema

表示payload的数据结构。

payload

payload

待同步的数据库数据。

before

before

用于指定数据库变更发生前的状态。

data

after

用于指定数据库变更发生后的状态。

-

source.version

Debezium的版本。

DATA_STORE

source.connector

source连接器的类型。

-

source.name

source连接器的名称。

TIMESTAMP

source.ts_ms

用于指定数据库变更发生的时间戳。

-

source.snapshot

是否是快照的一部分,取值为“true”和“false”。

-

source.db

待同步数据的数据库名称。

SEG_OWNER

source.schema

待同步数据的数据库Schema名称。

TABLE_NAME

source.table

待同步数据的表名。

transaction.properties.lsn

source.lsn

日志序号。

transaction.properties.txId

source.txId

事务ID。

OPERATION

op

数据操作类型,其中:

  • CDL JSON格式数据支持以下操作:
    • INSERT:插入
    • UPDATE:更新
    • DELETE:删除
  • Debezium JSON格式数据支持以下操作:
    • c:插入
    • u:更新
    • d:删除

unique

unique

主键

-

ts_ms

source连接器处理事件的时间戳。

LOB_COLUMNS

LOB_COLUMNS

大对象数据类型的字段名称。

HEARTBEAT_IDENTIFIER

HEARTBEAT_IDENTIFIER

心跳标识符。

以下为CDL JSON和开源Debezium JSON格式的数据库变更信息举例,其中type表示当前字段的类型,field表示当前字段名,fields表示当前字段所包含的字段数组,optional表示当前字段是否可以为null,name表示当前字段的别名。

  • 例如,一个CDL JSON格式的数据库变更信息如下:
    {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"DATA_STORE"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"value"}],"optional":false},"optional":false,"field":"properties"}],"optional":false,"name":"transaction","field":"transaction"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"data"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"HEARTBEAT_IDENTIFIER"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"DATA_STORE":"POSTGRESQL","SEG_OWNER":"public","TABLE_NAME":"ct_pg2hudi","TIMESTAMP":1707047996013,"OPERATION":"INSERT","LOB_COLUMNS":null,"transaction":{"properties":[{"name":"lsn","value":163955221008},{"name":"txId","value":57227595}]},"unique":{"id":34},"data":{"count1":13,"id":34,"time1":null,"decimalNum":null},"before":null,"message_version":"1.0","message_type":"0","HEARTBEAT_IDENTIFIER":"279fb050-0143-45c1-b184-50bc48c2461c"}}
  • 例如,一个Debezium JSON格式的数据库变更信息如下:
    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"before","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"count1"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.timestamp","field":"time1"},{"type":"string","optional":true,"name":"com.xxx.cdc.data.Decimal","field":"decimalNum"}],"optional":true,"name":"data","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"message_version"},{"type":"string","optional":false,"field":"message_type"},{"type":"string","optional":true,"field":"LOB_COLUMNS"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"unique","field":"unique"}],"optional":false,"name":"public.ct_pg2hudi"},"payload":{"before":null,"after":{"count1":14,"id":35,"time1":null,"decimalNum":null},"source":{"version":"1.4.0.Final","connector":"postgresql","name":"cdl","ts_ms":1707048891235,"snapshot":"false","db":"cdl","schema":"public","table":"ct_pg2hudi","txId":57227663,"lsn":163955586912},"op":"c","ts_ms":1707048984208,"message_version":"2.0","message_type":"0","LOB_COLUMNS":null,"unique":{"id":35}}}