Kafka消息格式
同步到Kafka集群中的数据以Avro、JSON和JSON-C格式存储。
avro格式
Avro格式的schema定义详情请参见record.rar。在实时同步到Kafka集群后,您需要根据schema定义进行数据解析。
JSON格式
|
参数名称 |
说明 |
|---|---|
|
mysqlType |
源端表字段名称和类型。 |
|
id |
DRS内部定义的事件操作的序列号,单调递增。 |
|
es |
源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 |
|
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
|
database |
数据库名称。 |
|
table |
表名。 |
|
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL,全量同步为INIT和INIT_DDL。 |
|
isDdl |
是否是DDL操作。 |
|
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
|
sqlType |
源端表字段的jdbc类型。 |
|
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 |
|
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 |
|
pkNames |
主键名称。 |
{
"mysqlType":{
"c11":"binary",
"c10":"varchar",
"c13":"text",
"c12":"varbinary",
"c14":"blob",
"c1":"varchar",
"c2":"varbinary",
"c3":"int",
"c4":"datetime",
"c5":"timestamp",
"c6":"char",
"c7":"float",
"c8":"double",
"c9":"decimal",
"id":"int"
},
"id":27677,
"es":1624614713000,
"ts":1625058726990,
"database":"test01",
"table":"test ",
"type":"UPDATE",
"isDdl":false,
"sql":"",
"sqlType":{
"c11":-2,
"c10":12,
"c13":-1,
"c12":-3,
"c14":2004,
"c1":12,
"c2":-3,
"c3":4,
"c4":94,
"c5":93,
"c6":1,
"c7":6,
"c8":8,
"c9":3,
"id":4
},
"data":[
{
"c11":"[]",
"c10":"cloud",
"c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
"c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
"c2":"[]",
"c3":"103",
"c4":"2021-06-25 17:51:53",
"c5":"1624614713.201",
"c6":"!@#$%90weurtg103",
"c7":"10357.0",
"c8":"1.2510357E7",
"c9":"9874510357",
"id":"104"
}
],
"old":[
{
"c11":"[]",
"c10":"cloud",
"c13":"asfiajhfiaf939-0239",
"c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
"c2":"[]",
"c3":"103",
"c4":"2021-06-25 17:51:53",
"c5":"1624614713.201",
"c6":"!@#$%90weurtg103",
"c7":"10357.0",
"c8":"1.2510357E7",
"c9":"9874510357",
"id":"103"
}
],
"pkNames":[
"id"
]
}
|
参数名称 |
说明 |
|---|---|
|
columnType |
源端表字段名称和数据类型。
说明:
|
|
dbType |
源库类型。 |
|
schema |
scheme名称。 |
|
opType |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
|
id |
DRS内部定义的事件操作的序列号,单调递增。 |
|
es |
源库不同引擎对应类型如下: Oracle:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 |
|
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
|
database |
数据库名称,dbType为Oracle时暂时为空。 |
|
table |
表名。 |
|
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
|
isDdl |
是否是DDL操作。 |
|
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
|
sqlType |
源端表字段的jdbc类型。 |
|
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 |
|
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 |
|
pkNames |
主键名称。 |
JSON-C格式
JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。
|
参数名称 |
说明 |
|---|---|
|
mysqlType |
源端表字段名称和类型。 |
|
id |
DRS内部定义的事件操作的序列号,单调递增。 |
|
es |
源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 |
|
ts |
写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 |
|
database |
数据库名称(Oracle数据库填写schema)。 |
|
table |
表名。 |
|
type |
操作类型,比如DELETE,UPDATE,INSERT,DDL。 |
|
isDdl |
是否是DDL操作。 |
|
sql |
DDL的SQL语句,在DML操作中,取值为""。 |
|
sqlType |
源端表字段的jdbc类型。 |
|
data |
最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。 |
|
old |
旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。 |
|
pkNames |
主键名称。 |
JSON格式数据中常见的转义字符
|
字符 |
转义字符 |
|---|---|
|
< |
\u003c |
|
= |
\u003d |
|
> |
\u003e |
|
& |
\u0026 |
|
' |
\u0027 |