更新时间:2022-12-08 GMT+08:00

Kafka消息格式

同步到Kafka集群中的数据以Avro、JSON和JSON-C格式存储。

avro格式

Avro格式的schema定义详情请参见record.rar。在实时同步到Kafka集群后,您需要根据schema定义进行数据解析,数据解析样例请参见drs-avro-read.rar

JSON格式

MySQL到Kafka的JSON格式定义详情参考表1
表1 MySQL到Kafka的参数说明

参数名称

说明

mysqlType

源端表字段名称和类型。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。

pkNames

主键名称。

{
    "mysqlType":{
        "c11":"binary",
        "c10":"varchar",
        "c13":"text",
        "c12":"varbinary",
        "c14":"blob",
        "c1":"varchar",
        "c2":"varbinary",
        "c3":"int",
        "c4":"datetime",
        "c5":"timestamp",
        "c6":"char",
        "c7":"float",
        "c8":"double",
        "c9":"decimal",
        "id":"int"
    },
    "id":27677,
    "es":1624614713000,
    "ts":1625058726990,
    "database":"test01",
    "table":"test ",
    "type":"UPDATE",
    "isDdl":false,
    "sql":"",
    "sqlType":{
        "c11":-2,
        "c10":12,
        "c13":-1,
        "c12":-3,
        "c14":2004,
        "c1":12,
        "c2":-3,
        "c3":4,
        "c4":94,
        "c5":93,
        "c6":1,
        "c7":6,
        "c8":8,
        "c9":3,
        "id":4
    },
    "data":[
        {
            "c11":"[]",
            "c10":"cloud",
            "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"104"
        }
    ],
    "old":[
        {
            "c11":"[]",
            "c10":"cloud",
            "c13":"asfiajhfiaf939-0239",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"103"
        }
    ],
    "pkNames":[
        "id"
    ]
}

JSON-C格式

JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。

JSON-C定义详情参考表2
表2 JSON-C参数说明

参数名称

说明

mysqlType

源端表字段名称和类型。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称(Oracle数据库填写schema)。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。

pkNames

主键名称。

JSON格式数据中常见的转义字符

表3 转义字符

字符

转义字符

<

\u003d

>

\u003e

&

\u0026

=

\u003d

'

\u0027