Kafka消息格式
同步到Kafka集群中的数据以Avro、JSON和JSON-C格式存储。不同链路支持的数据格式可参考表1:
数据流向 |
Avro |
JSON |
JSON-C |
---|---|---|---|
MySQL->Kafka |
支持 |
支持 |
支持 |
Oracle->Kafka |
支持 |
支持 |
暂不支持 |
DDS->Kafka |
暂不支持 |
支持 |
暂不支持 |
PostgreSQL->Kafka |
支持 |
支持 |
暂不支持 |
GaussDB(for MySQL)->Kafka |
支持 |
支持 |
支持 |
GaussDB主备版->Kafka |
支持 |
支持 |
暂不支持 |
GaussDB分布式版->Kafka |
支持 |
支持 |
暂不支持 |
Microsoft SQL Server->Kafka |
支持 |
支持 |
暂不支持 |
Avro格式
Avro格式的schema定义详情请参见record.rar。在实时同步到Kafka集群后,您需要根据schema定义进行数据解析。
JSON格式
参数名称 |
说明 |
---|---|
mysqlType |
源端表字段名称和类型。 |
id |
DRS内部定义的事件操作的序列号,单调递增。 |
es |
源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 |
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
database |
数据库名称。 |
table |
表名。 |
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL,全量同步为INIT和INIT_DDL。 |
isDdl |
是否是DDL操作。 |
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
sqlType |
源端表字段的jdbc类型。 |
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 |
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 |
pkNames |
主键名称。 |
{ "mysqlType":{ "c11":"binary", "c10":"varchar", "c13":"text", "c12":"varbinary", "c14":"blob", "c1":"varchar", "c2":"varbinary", "c3":"int", "c4":"datetime", "c5":"timestamp", "c6":"char", "c7":"float", "c8":"double", "c9":"decimal", "id":"int" }, "id":27677, "es":1624614713000, "ts":1625058726990, "database":"test01", "table":"test ", "type":"UPDATE", "isDdl":false, "sql":"", "sqlType":{ "c11":-2, "c10":12, "c13":-1, "c12":-3, "c14":2004, "c1":12, "c2":-3, "c3":4, "c4":94, "c5":93, "c6":1, "c7":6, "c8":8, "c9":3, "id":4 }, "data":[ { "c11":"[]", "c10":"华为云huaweicloud", "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"104" } ], "old":[ { "c11":"[]", "c10":"华为云huaweicloud", "c13":"asfiajhfiaf939-0239", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"103" } ], "pkNames":[ "id" ] }
参数名称 |
说明 |
---|---|
id |
DRS内部定义的事件操作的序列号,单调递增。 |
op |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
dbType |
源库类型:Mongo。 |
db |
数据库名称。 |
coll |
集合名称。 |
value |
这一条记录的变更值。 |
where |
这一条记录的变更条件。 |
recordType |
具体的记录类型,比如insert、update、replace、doc。其中,update和replace表示op中的UPDATE具体操作。doc表示op中的DELETE删除的是文档数据而非视图数据。 |
extra |
拓展字段,一般和recordType保持一致,作为扩展oplog记录使用。 |
es |
这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 |
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
clusterTime |
与事件关联的oplog条目的时间戳,格式为timestamp:incre(timestamp是10位unix时间戳,单位为秒;incre代表当前命令在同一秒内执行的次序)。 |
// insert操作 { "id": 256, "op": "INSERT", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"baz\", \"tags\": [\"mongodb\", \"database\", \"NoSQL\"]}", "where": null, "recordType": "insert", "extra": "insert", "es": 1684315111439, "ts": 1684315111576, "clusterTime": "1684344064:1" } // replace操作 { "id": 340, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"sss\"}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "replace", "extra": "replace", "es": 1684315951831, "ts": 1684315951961, "clusterTime": "1684344904:9" } // update 更新值操作 { "id": 386, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"$set\": {\"c1\": \"aaa\"}}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "update", "extra": "update", "es": 1684316412008, "ts": 1684316412146, "clusterTime": "1684345365:1" } // update 更新键操作 { "id": 414, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"$unset\": {\"c1\": true}, \"$set\": {\"column1\": \"aaa\"}}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "update", "extra": "update", "es": 1684316692054, "ts": 1684316692184, "clusterTime": "1684345648:1" } // remove 操作 { "id": 471, "op": "DELETE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "where": null, "recordType": "doc", "extra": "doc", "es": 1684317252747, "ts": 1684317252869, "clusterTime": "1684346209:1" }
参数名称 |
说明 |
---|---|
columnType |
源端表字段名称和数据类型。
说明:
|
dbType |
源库类型。 |
schema |
schema名称。 |
opType |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
id |
DRS内部定义的事件操作的序列号,单调递增。 |
es |
源库不同引擎对应类型如下: GaussDB主备版:当前事务的commit时间,13位Unix时间戳,单位为毫秒。 GaussDB分布式:当前事务的commit时间,13位Unix时间戳,单位为毫秒。 PostgreSQL:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。 Oracle:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 Microsoft SQL Server:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 |
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
database |
数据库名称,dbType为Oracle时暂时为空。 |
table |
表名。 |
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
isDdl |
是否是DDL操作。 |
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
sqlType |
源端表字段的jdbc类型。 |
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 |
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 |
pkNames |
主键名称。 |
{
"columnType": {
"timestamp_column": "timestamp without time zone",
"tstzrange_column": "tstzrange",
"int4range_column": "int4range",
"char_column": "character",
"jsonb_column": "json",
"boolean_column": "boolean",
"bit_column": "bit",
"smallint_column": "smallint",
"bytea_column": "bytea"
},
"dbType": "GaussDB Primary/Standby",
"schema": "schema01",
"opType": "UPDATE",
"id": 332,
"es": 1639626187000,
"ts": 1639629261915,
"database": "database01",
"table": "table01",
"type": "UPDATE",
"isDdl": false,
"sql": "",
"sqlType": {
"timestamp_column": 16,
"tstzrange_column": 46,
"int4range_column": 42,
"char_column": 9,
"jsonb_column": 22,
"boolean_column": 8,
"bit_column": 20,
"smallint_column": 2,
"bytea_column": 15
},
"data": [
{
"timestamp_column": "2021-12-16 12:31:49.344365",
"tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}",
"boolean_column": "false",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"old": [
{
"timestamp_column": "2014-07-02 06:14:00.742",
"tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}",
"boolean_column": "true",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"pkNames": null
}
JSON-C格式
JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。
参数名称 |
说明 |
---|---|
mysqlType |
源端表字段名称和类型。 |
id |
DRS内部定义的事件操作的序列号,单调递增。 |
es |
源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 |
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
database |
数据库名称(Oracle数据库填写schema)。 |
table |
表名。 |
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
isDdl |
是否是DDL操作。 |
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
sqlType |
源端表字段的jdbc类型。 |
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。 |
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。 |
pkNames |
主键名称。 |
JSON格式数据中常见的转义字符
字符 |
转义字符 |
---|---|
< |
\u003c |
= |
\u003d |
> |
\u003e |
& |
\u0026 |
' |
\u0027 |