更新时间:2022-05-20 GMT+08:00
分享

Kafka消息格式

同步到Kafka集群中的数据以Avro、JSON和JSON-C格式存储。

avro格式

Avro格式的schema定义详情请参见record.rar。在实时同步到Kafka集群后,您需要根据schema定义进行数据解析,数据解析样例请参见drs-avro-read.rar

JSON格式

MySQL、GaussDB(MySQL)到Kafka的JSON格式定义详情参考表1,GaussDB(for openGauss)、PostgreSQL和Oracle到Kafka的JSON格式定义详情参考表2
表1 MySQL到Kafka的参数说明

参数名称

说明

mysqlType

源端表字段名称和类型。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。

pkNames

主键名称。

{
    "mysqlType":{
        "c11":"binary",
        "c10":"varchar",
        "c13":"text",
        "c12":"varbinary",
        "c14":"blob",
        "c1":"varchar",
        "c2":"varbinary",
        "c3":"int",
        "c4":"datetime",
        "c5":"timestamp",
        "c6":"char",
        "c7":"float",
        "c8":"double",
        "c9":"decimal",
        "id":"int"
    },
    "id":27677,
    "es":1624614713000,
    "ts":1625058726990,
    "database":"test01",
    "table":"test ",
    "type":"UPDATE",
    "isDdl":false,
    "sql":"",
    "sqlType":{
        "c11":-2,
        "c10":12,
        "c13":-1,
        "c12":-3,
        "c14":2004,
        "c1":12,
        "c2":-3,
        "c3":4,
        "c4":94,
        "c5":93,
        "c6":1,
        "c7":6,
        "c8":8,
        "c9":3,
        "id":4
    },
    "data":[
        {
            "c11":"[]",
            "c10":"华为云huaweicloud",
            "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"104"
        }
    ],
    "old":[
        {
            "c11":"[]",
            "c10":"华为云huaweicloud",
            "c13":"asfiajhfiaf939-0239",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"103"
        }
    ],
    "pkNames":[
        "id"
    ]
}
表2 其他数据库到Kafka的参数说明

参数名称

说明

columnType

源端表字段名称和数据类型。

说明:
  • 数据类型不带长度、精度等。
  • dbType为Oracle时暂为空。

dbType

源库类型。

不同引擎对应类型如下:

GaussDB(for openGauss)主备版:GaussDB(for openGauss) Primary/Standby

GaussDB(for openGauss)分布式:GaussDB(for openGauss) Distributed

PostgreSQL:Postgresql

Oracle:Oracle

schema

scheme名称。

opType

操作类型,比如DELETE,UPDATE,INSERT,DDL。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库不同引擎对应类型如下:

GaussDB(for openGauss)主备版:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。

GaussDB(for openGauss)分布式:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。

PostgreSQL:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。

Oracle:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称,dbType为Oracle时暂时为空。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。

pkNames

主键名称。

{
    "columnType": {
        "timestamp_column": "timestamp without time zone", 
        "tstzrange_column": "tstzrange", 
        "int4range_column": "int4range", 
        "char_column": "character", 
        "jsonb_column": "json", 
        "boolean_column": "boolean", 
        "bit_column": "bit", 
        "smallint_column": "smallint", 
        "bytea_column": "bytea"
    }, 
    "dbType": "GaussDB(for openGauss) Primary/Standby", 
    "schema": "schema01", 
    "opType": "UPDATE", 
    "id": 332, 
    "es": 1639626187000, 
    "ts": 1639629261915, 
    "database": "database01", 
    "table": "table01", 
    "type": "UPDATE", 
    "isDdl": false, 
    "sql": "", 
    "sqlType": {
        "timestamp_column": 16, 
        "tstzrange_column": 46, 
        "int4range_column": 42, 
        "char_column": 9, 
        "jsonb_column": 22, 
        "boolean_column": 8, 
        "bit_column": 20, 
        "smallint_column": 2, 
        "bytea_column": 15
    }, 
    "data": [
        {
            "timestamp_column": "2021-12-16 12:31:49.344365", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "false", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "old": [
        {
            "timestamp_column": "2014-07-02 06:14:00.742", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "true", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "pkNames": null
}

JSON-C格式

JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。

JSON-C定义详情参考表3
表3 JSON-C参数说明

参数名称

说明

mysqlType

源端表字段名称和类型。

id

DRS内部定义的事件操作的序列号,单调递增。

es

源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。

ts

写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。

database

数据库名称(Oracle数据库填写schema)。

table

表名。

type

操作类型,比如DELETE,UPDATE,INSERT,DDL。

isDdl

是否是DDL操作。

sql

DDL的SQL语句,在DML操作中,取值为""。

sqlType

源端表字段的jdbc类型。

data

最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。

old

旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。

pkNames

主键名称。

JSON格式数据中常见的转义字符

表4 转义字符

字符

转义字符

<

\u003d

>

\u003e

&

\u0026

=

\u003d

'

\u0027

分享:

    相关文档

    相关产品

close