Updated on 2024-03-28 GMT+08:00

Kafka Message Format

Data synchronized to the Kafka cluster is stored in Avro, JSON, and JSON-C formats. For details about the data formats supported by different data flow scenarios, see Table 1.

Table 1 Data formats

Data Flow

Avro

JSON

JSON-C

MySQL -> Kafka

Supported

Supported

Supported

Oracle -> Kafka

Supported

Supported

Not supported

DDS->Kafka

Not supported

Yes

Not supported

PostgreSQL -> Kafka

Supported

Supported

Not supported

GaussDB(for MySQL) -> Kafka

Supported

Supported

Supported

GaussDB primary/standby -> Kafka

Supported

Supported

Not supported

GaussDB distributed -> Kafka

Supported

Supported

Not supported

Microsoft SQL Server->Kafka

Supported

Supported

Not supported

Avro

For details about the schema definition in Avro format, see record.rar. After data is synchronized to the Kafka cluster, parse data based on the definition of the Avro schema.

JSON

For details about the JSON format from MySQL and GaussDB (MySQL) to Kafka, see Table 2. For details about the JSON format from DDS to Kafka, see Table 3. For details about the JSON format from PostgreSQL, GaussDB, Microsoft SQL Server and Oracle to Kafka, see Table 4.
Table 2 Parameters for synchronizing from MySQL to Kafka

Parameter

Description

mysqlType

Field name and type in the source table.

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

es

The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds.

ts

The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds.

database

Database name

table

Table name.

type

Operation type, for example, DELETE, UPDATE, INSERT, and DDL. For full synchronization, the value can be INIT or INIT_DDL.

isDdl

Whether the operation is a DDL operation.

sql

A DDL-defined SQL statement. The value is "".

sqlType

JDBC type of the fields in the source table.

data

The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated.

old

Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted.

pkNames

Primary key name

{
    "mysqlType":{
        "c11":"binary",
        "c10":"varchar",
        "c13":"text",
        "c12":"varbinary",
        "c14":"blob",
        "c1":"varchar",
        "c2":"varbinary",
        "c3":"int",
        "c4":"datetime",
        "c5":"timestamp",
        "c6":"char",
        "c7":"float",
        "c8":"double",
        "c9":"decimal",
        "id":"int"
    },
    "id":27677,
    "es":1624614713000,
    "ts":1625058726990,
    "database":"test01",
    "table":"test ",
    "type":"UPDATE",
    "isDdl":false,
    "sql":"",
    "sqlType":{
        "c11":-2,
        "c10":12,
        "c13":-1,
        "c12":-3,
        "c14":2004,
        "c1":12,
        "c2":-3,
        "c3":4,
        "c4":94,
        "c5":93,
        "c6":1,
        "c7":6,
        "c8":8,
        "c9":3,
        "id":4
    },
    "data":[
        {
            "c11":"[]",
"c10": "Huawei Cloud huaweicloud",
            "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"104"
        }
    ],
    "old":[
        {
            "c11":"[]",
"c10": "Huawei Cloud huaweicloud",
            "c13":"asfiajhfiaf939-0239",
            "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
            "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
            "c2":"[]",
            "c3":"103",
            "c4":"2021-06-25 17:51:53",
            "c5":"1624614713.201",
            "c6":"!@#$%90weurtg103",
            "c7":"10357.0",
            "c8":"1.2510357E7",
            "c9":"9874510357",
            "id":"103"
        }
    ],
    "pkNames":[
        "id"
    ]
}
Table 3 Parameters for synchronizing data from DDS to Kafka

Parameter

Description

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

op

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

dbType

Source database type: MongoDB

db

Database name.

coll

Collection name.

value

Change value of a record.

where

Change condition of a record.

recordType

Record type, such as insert, update, replace, and doc. update and replace indicate the UPDATE operation in op. doc indicates that the DELETE operation in op deletes document data instead of view data.

extra

Extended field. The value is the same as that of recordType and this parameter is used as an extended oplog record.

es

Commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds.

ts

The time when the data is written to the destination Kafka. The value is a 13-digit Unix timestamp in milliseconds.

clusterTime

Timestamp of the oplog entry associated with the event. The value is in the format of timestamp:incre. timestamp is the Unix timestamp (unit: second), and incre is the command execution sequence in a second.

// insert operation
{
  "id": 256,
  "op": "INSERT",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"baz\", \"tags\": [\"mongodb\", \"database\", \"NoSQL\"]}",
  "where": null,
  "recordType": "insert",
  "extra": "insert",
  "es": 1684315111439,
  "ts": 1684315111576,
  "clusterTime": "1684344064:1"
}

// replace operation
{
  "id": 340,
  "op": "UPDATE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"sss\"}",
  "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "recordType": "replace",
  "extra": "replace",
  "es": 1684315951831,
  "ts": 1684315951961,
  "clusterTime": "1684344904:9"
}

// value update operation
{
  "id": 386,
  "op": "UPDATE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"$set\": {\"c1\": \"aaa\"}}",
  "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "recordType": "update",
  "extra": "update",
  "es": 1684316412008,
  "ts": 1684316412146,
  "clusterTime": "1684345365:1"
}

// key update operation
{
  "id": 414,
  "op": "UPDATE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"$unset\": {\"c1\": true}, \"$set\": {\"column1\": \"aaa\"}}",
  "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "recordType": "update",
  "extra": "update",
  "es": 1684316692054,
  "ts": 1684316692184,
  "clusterTime": "1684345648:1"
}

// remove operation
{
  "id": 471,
  "op": "DELETE",
  "dbType": "MongoDB",
  "db": "ljx",
  "coll": "ljx",
  "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}",
  "where": null,
  "recordType": "doc",
  "extra": "doc",
  "es": 1684317252747,
  "ts": 1684317252869,
  "clusterTime": "1684346209:1"
}
Table 4 Parameters for synchronizing from other databases to Kafka

Parameter

Description

columnType

Field name and data type in the source table

NOTE:
  • The data type does not contain the length and precision.
  • This parameter is left blank when dbType is set to Oracle or Microsoft SQL Server.

dbType

Source database type

schema

Schema name.

opType

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

es

The source DB engine types are as follows:

GaussDB primary/standby: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds.

GaussDB distributed: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds.

PostgreSQL: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds.

Oracle: commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds.

Microsoft SQL Server: commit time of a record. The value is a 13-digit Unix timestamp, in seconds.

ts

The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds.

database

Database name. This parameter is left blank when dbType is set to Oracle.

table

Table name.

type

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

isDdl

Whether the operation is a DDL operation.

sql

A DDL-defined SQL statement. The value is "".

sqlType

JDBC type of the fields in the source table.

data

The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated.

old

Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted.

pkNames

Primary key name

{
    "columnType": {
        "timestamp_column": "timestamp without time zone", 
        "tstzrange_column": "tstzrange", 
        "int4range_column": "int4range", 
        "char_column": "character", 
        "jsonb_column": "json", 
        "boolean_column": "boolean", 
        "bit_column": "bit", 
        "smallint_column": "smallint", 
        "bytea_column": "bytea"
    }, 
    "dbType": "GaussDB Primary/Standby", 
    "schema": "schema01", 
    "opType": "UPDATE", 
    "id": 332, 
    "es": 1639626187000, 
    "ts": 1639629261915, 
    "database": "database01", 
    "table": "table01", 
    "type": "UPDATE", 
    "isDdl": false, 
    "sql": "", 
    "sqlType": {
        "timestamp_column": 16, 
        "tstzrange_column": 46, 
        "int4range_column": 42, 
        "char_column": 9, 
        "jsonb_column": 22, 
        "boolean_column": 8, 
        "bit_column": 20, 
        "smallint_column": 2, 
        "bytea_column": 15
    }, 
    "data": [
        {
            "timestamp_column": "2021-12-16 12:31:49.344365", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "false", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "old": [
        {
            "timestamp_column": "2014-07-02 06:14:00.742", 
            "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", 
            "int4range_column": "[11,20)", 
            "char_column": "g", 
            "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", 
            "boolean_column": "true", 
            "bit_column": "1", 
            "smallint_column": "12", 
            "bytea_column": "62797465615f64617461"
        }
    ], 
    "pkNames": null
}

JSON-C

JSON-C is similar to JSON. The difference lies in the delete operation. JSON data is stored in old, and JSON-C is stored in data. Data of the timestamp type is converted into a character string in the format of yyyy-mm-dd hh:mm:ss.

For details, see Table 5.
Table 5 JSON-C parameter description

Parameter

Description

mysqlType

Field name and type in the source table.

id

Sequence number of an event operation defined in DRS. The value increases monotonically.

es

The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds.

ts

The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds.

database

Database name. For the Oracle database, set this parameter to schema.

table

Table name.

type

Operation type, such as DELETE, UPDATE, INSERT, and DDL.

isDdl

Whether the operation is a DDL operation.

sql

A DDL-defined SQL statement. The value is "".

sqlType

JDBC type of the fields in the source table.

data

Latest data, which is a JSON array. If type is set to INSERT, this parameter indicates the latest inserted data. If type is set to UPDATE, this parameter indicates the latest updated data. If type is set to DELETE, this parameter indicates the deleted data.

old

Old data. If type is set to UPDATE, the value indicates the data before update. If type is set to INSERT, the value is null.

pkNames

Primary key name

Common Escape Characters in JSON

Table 6 Escape Character

Character

Escape character

<

\u003c

=

\u003d

>

\u003e

&

\u0026

'

\u0027