Updated on 2023-04-26 GMT+08:00

Kafka Message Format

Data synchronized to the Kafka cluster is stored in Avro, JSON, and JSON-C formats.

Avro Format

For details about the schema definition in Avro format, see record.rar. After data is synchronized to the Kafka cluster, parse data based on the definition of the Avro schema. For details about the data parsing, see drs-avro-read.rar.

JSON

For details about the JSON format from MySQL and GaussDB(MySQL) to Kafka, see Table 1. For details about the JSON format from GaussDB, PostgreSQL, and Oracle to Kafka, see Table 2.
Table 1 Parameters for synchronizing from MySQL to Kafka

Parameter

Description

mysqlType

Field name and type in the source table.

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

es

The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds.

ts

The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds.

database

Database name

table

Table name.

type

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

isDdl

Whether the operation is a DDL operation.

sql

A DDL-defined SQL statement. The value is "".

sqlType

JDBC type of the fields in the source table.

data

The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated.

old

Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted.

pkNames

Primary key name

{
    "mysqlType":{
        "c11":"binary",
        "c10":"varchar",
        "c13":"text",
        "c12":"varbinary",
        "c14":"blob",
        "c1":"varchar",
        "c2":"varbinary",
        "c3":"int",
        "c4":"datetime",
        "c5":"timestamp",
        "c6":"char",
        "c7":"float",
        "c8":"double",
        "c9":"decimal",
        "id":"int"
    },
    "id":27677,
    "es":1624614713000,
    "ts":1625058726990,
    "database":"test01",
    "table":"test ",
    "type":"UPDATE",
    "isDdl":false,
    "sql":"",
    "sqlType":{
        "c11":-2,
        "c10":12,
        "c13":-1,
        "c12":-3,
        "c14":2004,
        "c1":12,
        "c2":-3,
        "c3":4,
        "c4":94,
        "c5":93,
        "c6":1,
        "c7":6,
        "c8":8,
        "c9":3,
        "id":4
    },
    "data":[
        {
            "c11":"[]",
"c10": "Huawei Cloud huaweicloud",
            "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"104"
        }
    ],
    "old":[
        {
            "c11":"[]",
"c10": "Huawei Cloud huaweicloud",
            "c13":"asfiajhfiaf939-0239",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"103"
        }
    ],
    "pkNames":[
        "id"
    ]
}
Table 2 Parameters for synchronizing from other databases to Kafka

Parameter

Description

columnType

Field name and data type in the source table

NOTE:
  • The data type does not contain the length and precision.
  • This parameter is left blank when dbType is set to Oracle.

dbType

Source database type

The types of different engines are as follows:

GaussDB Primary/Standby

GaussDB Distributed

PostgreSQL

Oracle

schema

Name of a scheme

opType

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

es

The source DB engine types are as follows:

GaussDB primary/standby: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds.

GaussDB distributed: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds.

PostgreSQL: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds.

Oracle: commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds.

ts

The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds.

database

Database name. This parameter is left blank when dbType is set to Oracle.

table

Table name.

type

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

isDdl

Whether the operation is a DDL operation.

sql

A DDL-defined SQL statement. The value is "".

sqlType

JDBC type of the fields in the source table.

data

The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated.

old

Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted.

pkNames

Primary key name

{
    "columnType": {
        "timestamp_column": "timestamp without time zone", 
        "tstzrange_column": "tstzrange", 
        "int4range_column": "int4range", 
        "char_column": "character", 
        "jsonb_column": "json", 
        "boolean_column": "boolean", 
        "bit_column": "bit", 
        "smallint_column": "smallint", 
        "bytea_column": "bytea"
    }, 
    "dbType": "GaussDB Primary/Standby", 
    "schema": "schema01", 
    "opType": "UPDATE", 
    "id": 332, 
    "es": 1639626187000, 
    "ts": 1639629261915, 
    "database": "database01", 
    "table": "table01", 
    "type": "UPDATE", 
    "isDdl": false, 
    "sql": "", 
    "sqlType": {
        "timestamp_column": 16, 
        "tstzrange_column": 46, 
        "int4range_column": 42, 
        "char_column": 9, 
        "jsonb_column": 22, 
        "boolean_column": 8, 
        "bit_column": 20, 
        "smallint_column": 2, 
        "bytea_column": 15
    }, 
    "data": [
        {
            "timestamp_column": "2021-12-16 12:31:49.344365", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "false", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "old": [
        {
            "timestamp_column": "2014-07-02 06:14:00.742", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "true", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "pkNames": null
}

JSON-C

JSON-C is similar to JSON. The difference lies in the delete operation. JSON data is stored in old, and JSON-C is stored in data. Data of the timestamp type is converted into a character string in the format of yyyy-mm-dd hh:mm:ss.

For details, see Table 3.
Table 3 JSON-C parameter description

Parameter

Description

mysqlType

Field name and type in the source table.

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

es

The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds.

ts

The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds.

database

Database name. For the Oracle database, set this parameter to schema.

table

Table name.

type

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

isDdl

Whether the operation is a DDL operation.

sql

A DDL-defined SQL statement. The value is "".

sqlType

JDBC type of the fields in the source table.

data

Latest data, which is a JSON array. If type is set to INSERT, this parameter indicates the latest inserted data. If type is set to UPDATE, this parameter indicates the latest updated data. If type is set to DELETE, this parameter indicates the deleted data.

old

Old data. If type is set to UPDATE, the value indicates the data before update. If type is set to INSERT, the value is null.

pkNames

Primary key name

Common Escape Characters in JSON

Table 4 Escape Character

Character

Escape character

<

\u003d

>

\u003e

&

\u0026

=

\u003d

'

\u0027