Kafka消息格式
同步到Kafka集群中的数据以Avro、JSON和JSON-C格式存储。
Avro格式
Avro格式的schema定义详情请参见record.rar。在实时同步到Kafka集群后,您需要根据schema定义进行数据解析,数据解析样例请参见drs-avro-read.rar。
JSON格式
参数名称 |
说明 |
---|---|
mysqlType |
源端表字段名称和类型。 |
id |
DRS内部定义的事件操作的序列号,单调递增。 |
es |
源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 |
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
database |
数据库名称。 |
table |
表名。 |
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL,全量同步为INIT和INIT_DDL。 |
isDdl |
是否是DDL操作。 |
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
sqlType |
源端表字段的jdbc类型。 |
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 |
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 |
pkNames |
主键名称。 |
增量同步的消息体:
{ "mysqlType":{ "c11":"binary", "c10":"varchar", "c13":"text", "c12":"varbinary", "c14":"blob", "c1":"varchar", "c2":"varbinary", "c3":"int", "c4":"datetime", "c5":"timestamp", "c6":"char", "c7":"float", "c8":"double", "c9":"decimal", "id":"int" }, "id":27677, "es":1624614713000, "ts":1625058726990, "database":"test01", "table":"test ", "type":"UPDATE", "isDdl":false, "sql":"", "sqlType":{ "c11":-2, "c10":12, "c13":-1, "c12":-3, "c14":2004, "c1":12, "c2":-3, "c3":4, "c4":94, "c5":93, "c6":1, "c7":6, "c8":8, "c9":3, "id":4 }, "data":[ { "c11":"[]", "c10":"cloud", "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"104" } ], "old":[ { "c11":"[]", "c10":"cloud", "c13":"asfiajhfiaf939-0239", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"103" } ], "pkNames":[ "id" ] } 全量同步的消息体(表数据): { "jobId": "e28c0c1e-e3ab-4f6c-bbc2-69c28670deec", "shardId": "c66c15af-747b-4bd6-9b5d-de2e8e38f935", "identifier": "1", "eventId": "mysql-bin.000065:800;2bf665dc-f8e2-11eb-bcfd-6c4b90b9d8b3:1-251167", "mysqlType": { "c11": "binary", "c10": "varchar", "c13": "text", "c12": "varbinary", "c14": "blob", "c1": "varchar", "c2": "varbinary", "c3": "int", "c4": "datetime", "c5": "timestamp", "c6": "char", "c7": "float", "c8": "double", "c9": "decimal", "id": "int" }, "id": 27677, "es": 1624614713000, "ts": 1625058726990, "database": "test01", "table": "test", "type": "INIT", "isDdl": false, "sql": "", "sqlType": { "c11": -2, "c10": 12, "c13": -1, "c12": -3, "c14": 2004, "c1": 12, "c2": -3, "c3": 4, "c4": 94, "c5": 93, "c6": 1, "c7": 6, "c8": 8, "c9": 3, "id": 4 }, "data": [{ "c11": "[]", "c10": "cloud", "c13": "asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103", "c12": "[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14": "[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1": "cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2": "[]", "c3": "103", "c4": "2021-06-25 17:51:53", "c5": "1624614713.201", "c6": "!@#$%90weurtg103", "c7": "10357.0", "c8": "1.2510357E7", "c9": "9874510357", "id": "104" }], "old": null, "pkNames": [ "id" ] } 全量同步的消息体(表结构): { "jobId": "e28c0c1e-e3ab-4f6c-bbc2-69c28670deec", "shardId": null, "identifier": null, "eventId": "", "mysqlType": null, "id": 0, "es": 1733998006282, "ts": 1733998006282, "database": "test01", "table": "test", "type": "INIT_DDL", "isDdl": true, "sql": "CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `c1` varchar(255) DEFAULT NULL, `c2` blob, `c3` longblob, `c4` bit(2) DEFAULT NULL, `c5` binary(255) DEFAULT NULL, `c6` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE InnoDB AUTO_INCREMENT DEFAULT CHARSET utf8", "sqlType": null, "data": null, "old": null, "pkNames": null }
JSON-C格式
JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。
参数名称 |
说明 |
---|---|
mysqlType |
源端表字段名称和类型。 |
id |
DRS内部定义的事件操作的序列号,单调递增。 |
es |
源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 |
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
database |
数据库名称(Oracle数据库填写schema)。 |
table |
表名。 |
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
isDdl |
是否是DDL操作。 |
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
sqlType |
源端表字段的jdbc类型。 |
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。 |
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。 |
pkNames |
主键名称。 |
JSON格式数据中常见的转义字符
字符 |
转义字符 |
---|---|
< |
\u003c |
= |
\u003d |
> |
\u003e |
& |
\u0026 |
' |
\u0027 |