更新时间:2024-07-11 GMT+08:00

Kafka消息格式

同步到Kafka集群中的数据以Avro、JSON和JSON-C格式存储。不同链路支持的数据格式可参考表1

表1 支持的数据格式

数据流向

Avro

JSON

JSON-C

MySQL->Kafka

支持

支持

支持

Oracle->Kafka

支持

支持

暂不支持

DDS->Kafka

暂不支持

支持

暂不支持

PostgreSQL->Kafka

支持

支持

暂不支持

GaussDB(for MySQL)->Kafka

支持

支持

支持

GaussDB主备版->Kafka

支持

支持

暂不支持

GaussDB分布式版->Kafka

支持

支持

暂不支持

Microsoft SQL Server->Kafka

支持

支持

暂不支持

Avro格式

Avro格式的schema定义详情请参见record.rar。在实时同步到Kafka集群后,您需要根据schema定义进行数据解析。

JSON格式

MySQL、GaussDB(MySQL)到Kafka的JSON格式定义详情参考表2,DDS到Kafka的JSON格式定义详情参考表3,Oracle、PostgreSQL、GaussDB、Microsoft SQL Server到Kafka的JSON格式定义详情参考表4
表2 MySQL到Kafka的参数说明

参数名称

说明

mysqlType

源端表字段名称和类型。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL,全量同步为INIT和INIT_DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。

pkNames

主键名称。

{
    "mysqlType":{
        "c11":"binary",
        "c10":"varchar",
        "c13":"text",
        "c12":"varbinary",
        "c14":"blob",
        "c1":"varchar",
        "c2":"varbinary",
        "c3":"int",
        "c4":"datetime",
        "c5":"timestamp",
        "c6":"char",
        "c7":"float",
        "c8":"double",
        "c9":"decimal",
        "id":"int"
    },
    "id":27677,
    "es":1624614713000,
    "ts":1625058726990,
    "database":"test01",
    "table":"test ",
    "type":"UPDATE",
    "isDdl":false,
    "sql":"",
    "sqlType":{
        "c11":-2,
        "c10":12,
        "c13":-1,
        "c12":-3,
        "c14":2004,
        "c1":12,
        "c2":-3,
        "c3":4,
        "c4":94,
        "c5":93,
        "c6":1,
        "c7":6,
        "c8":8,
        "c9":3,
        "id":4
    },
    "data":[
        {
            "c11":"[]",
            "c10":"华为云huaweicloud",
            "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"104"
        }
    ],
    "old":[
        {
            "c11":"[]",
            "c10":"华为云huaweicloud",
            "c13":"asfiajhfiaf939-0239",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"103"
        }
    ],
    "pkNames":[
        "id"
    ]
}
表3 DDS到Kafka的参数说明

参数名称

说明

id

DRS内部定义的事件操作的序列号,单调递增。

op

操作类型,比如DELETE,UPDATE,INSERT,DDL。

dbType

源库类型:Mongo。

db

数据库名称。

coll

集合名称。

value

这一条记录的变更值。

where

这一条记录的变更条件。

recordType

具体的记录类型,比如insert、update、replace、doc。其中,update和replace表示op中的UPDATE具体操作。doc表示op中的DELETE删除的是文档数据而非视图数据。

extra

拓展字段,一般和recordType保持一致,作为扩展oplog记录使用。

es

这一条记录的commit时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

clusterTime

与事件关联的oplog条目的时间戳,格式为timestamp:incre(timestamp是10位unix时间戳,单位为秒;incre代表当前命令在同一秒内执行的次序)。

// insert操作
{
  "id": 256,
  "op": "INSERT",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"baz\", \"tags\": [\"mongodb\", \"database\", \"NoSQL\"]}",
  "where": null,
  "recordType": "insert",
  "extra": "insert",
  "es": 1684315111439,
  "ts": 1684315111576,
  "clusterTime": "1684344064:1"
}

// replace操作
{
  "id": 340,
  "op": "UPDATE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"sss\"}",
  "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "recordType": "replace",
  "extra": "replace",
  "es": 1684315951831,
  "ts": 1684315951961,
  "clusterTime": "1684344904:9"
}

// update 更新值操作
{
  "id": 386,
  "op": "UPDATE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"$set\": {\"c1\": \"aaa\"}}",
  "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "recordType": "update",
  "extra": "update",
  "es": 1684316412008,
  "ts": 1684316412146,
  "clusterTime": "1684345365:1"
}

// update 更新键操作
{
  "id": 414,
  "op": "UPDATE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"$unset\": {\"c1\": true}, \"$set\": {\"column1\": \"aaa\"}}",
  "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "recordType": "update",
  "extra": "update",
  "es": 1684316692054,
  "ts": 1684316692184,
  "clusterTime": "1684345648:1"
}

// remove 操作
{
  "id": 471,
  "op": "DELETE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "where": null,
  "recordType": "doc",
  "extra": "doc",
  "es": 1684317252747,
  "ts": 1684317252869,
  "clusterTime": "1684346209:1"
}
表4 其他数据库到Kafka的参数说明

参数名称

说明

columnType

源端表字段名称和数据类型。

说明:
  • 数据类型不带长度、精度等。
  • dbType为Oracle、Microsoft SQL Server时暂为空。

dbType

源库类型。

schema

schema名称。

opType

操作类型,比如DELETE,UPDATE,INSERT,DDL。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库不同引擎对应类型如下:

GaussDB主备版:当前事务的commit时间,13位Unix时间戳,单位为毫秒。

GaussDB分布式:当前事务的commit时间,13位Unix时间戳,单位为毫秒。

PostgreSQL:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。

Oracle:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。

Microsoft SQL Server:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称,dbType为Oracle时暂时为空。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。

pkNames

主键名称。

{
    "columnType": {
        "timestamp_column": "timestamp without time zone", 
        "tstzrange_column": "tstzrange", 
        "int4range_column": "int4range", 
        "char_column": "character", 
        "jsonb_column": "json", 
        "boolean_column": "boolean", 
        "bit_column": "bit", 
        "smallint_column": "smallint", 
        "bytea_column": "bytea"
    }, 
    "dbType": "GaussDB Primary/Standby", 
    "schema": "schema01", 
    "opType": "UPDATE", 
    "id": 332, 
    "es": 1639626187000, 
    "ts": 1639629261915, 
    "database": "database01", 
    "table": "table01", 
    "type": "UPDATE", 
    "isDdl": false, 
    "sql": "", 
    "sqlType": {
        "timestamp_column": 16, 
        "tstzrange_column": 46, 
        "int4range_column": 42, 
        "char_column": 9, 
        "jsonb_column": 22, 
        "boolean_column": 8, 
        "bit_column": 20, 
        "smallint_column": 2, 
        "bytea_column": 15
    }, 
    "data": [
        {
            "timestamp_column": "2021-12-16 12:31:49.344365", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "false", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "old": [
        {
            "timestamp_column": "2014-07-02 06:14:00.742", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "true", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "pkNames": null
}

JSON-C格式

JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。

JSON-C定义详情参考表5
表5 JSON-C参数说明

参数名称

说明

mysqlType

源端表字段名称和类型。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称(Oracle数据库填写schema)。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。

pkNames

主键名称。

JSON格式数据中常见的转义字符

表6 转义字符

字符

转义字符

<

\u003c

=

\u003d

>

\u003e

&

\u0026

'

\u0027