Kafka Message Format
Data synchronized to the Kafka cluster is stored in Avro, JSON, and JSON-C formats. For details about the data formats supported by different data flow scenarios, see Table 1.
Data Flow |
Avro |
JSON |
JSON-C |
---|---|---|---|
MySQL -> Kafka |
Supported |
Supported |
Supported |
Oracle -> Kafka |
Supported |
Supported |
Not supported |
DDS->Kafka |
Not supported |
Yes |
Not supported |
PostgreSQL -> Kafka |
Supported |
Supported |
Not supported |
GaussDB(for MySQL) -> Kafka |
Supported |
Supported |
Supported |
GaussDB primary/standby -> Kafka |
Supported |
Supported |
Not supported |
GaussDB distributed -> Kafka |
Supported |
Supported |
Not supported |
Microsoft SQL Server->Kafka |
Supported |
Supported |
Not supported |
Avro
For details about the schema definition in Avro format, see record.rar. After data is synchronized to the Kafka cluster, parse data based on the definition of the Avro schema.
JSON
Parameter |
Description |
---|---|
mysqlType |
Field name and type in the source table. |
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
es |
The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds. |
ts |
The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
database |
Database name |
table |
Table name. |
type |
Operation type, for example, DELETE, UPDATE, INSERT, and DDL. For full synchronization, the value can be INIT or INIT_DDL. |
isDdl |
Whether the operation is a DDL operation. |
sql |
A DDL-defined SQL statement. The value is "". |
sqlType |
JDBC type of the fields in the source table. |
data |
The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated. |
old |
Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted. |
pkNames |
Primary key name |
{ "mysqlType":{ "c11":"binary", "c10":"varchar", "c13":"text", "c12":"varbinary", "c14":"blob", "c1":"varchar", "c2":"varbinary", "c3":"int", "c4":"datetime", "c5":"timestamp", "c6":"char", "c7":"float", "c8":"double", "c9":"decimal", "id":"int" }, "id":27677, "es":1624614713000, "ts":1625058726990, "database":"test01", "table":"test ", "type":"UPDATE", "isDdl":false, "sql":"", "sqlType":{ "c11":-2, "c10":12, "c13":-1, "c12":-3, "c14":2004, "c1":12, "c2":-3, "c3":4, "c4":94, "c5":93, "c6":1, "c7":6, "c8":8, "c9":3, "id":4 }, "data":[ { "c11":"[]", "c10": "Huawei Cloud huaweicloud", "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"104" } ], "old":[ { "c11":"[]", "c10": "Huawei Cloud huaweicloud", "c13":"asfiajhfiaf939-0239", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"103" } ], "pkNames":[ "id" ] }
Parameter |
Description |
---|---|
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
op |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
dbType |
Source database type: MongoDB |
db |
Database name. |
coll |
Collection name. |
value |
Change value of a record. |
where |
Change condition of a record. |
recordType |
Record type, such as insert, update, replace, and doc. update and replace indicate the UPDATE operation in op. doc indicates that the DELETE operation in op deletes document data instead of view data. |
extra |
Extended field. The value is the same as that of recordType and this parameter is used as an extended oplog record. |
es |
Commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds. |
ts |
The time when the data is written to the destination Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
clusterTime |
Timestamp of the oplog entry associated with the event. The value is in the format of timestamp:incre. timestamp is the Unix timestamp (unit: second), and incre is the command execution sequence in a second. |
// insert operation { "id": 256, "op": "INSERT", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"baz\", \"tags\": [\"mongodb\", \"database\", \"NoSQL\"]}", "where": null, "recordType": "insert", "extra": "insert", "es": 1684315111439, "ts": 1684315111576, "clusterTime": "1684344064:1" } // replace operation { "id": 340, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"sss\"}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "replace", "extra": "replace", "es": 1684315951831, "ts": 1684315951961, "clusterTime": "1684344904:9" } // value update operation { "id": 386, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"$set\": {\"c1\": \"aaa\"}}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "update", "extra": "update", "es": 1684316412008, "ts": 1684316412146, "clusterTime": "1684345365:1" } // key update operation { "id": 414, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"$unset\": {\"c1\": true}, \"$set\": {\"column1\": \"aaa\"}}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "update", "extra": "update", "es": 1684316692054, "ts": 1684316692184, "clusterTime": "1684345648:1" } // remove operation { "id": 471, "op": "DELETE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "where": null, "recordType": "doc", "extra": "doc", "es": 1684317252747, "ts": 1684317252869, "clusterTime": "1684346209:1" }
Parameter |
Description |
---|---|
columnType |
Field name and data type in the source table
NOTE:
|
dbType |
Source database type |
schema |
Schema name. |
opType |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
es |
The source DB engine types are as follows: GaussDB Primary/Standby: commit time of the current transaction. The value is a 13-digit Unix timestamp, in milliseconds. GaussDB Distributed: commit time of the current transaction. The value is a 13-digit Unix timestamp, in milliseconds. PostgreSQL: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds. Oracle: commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds. Microsoft SQL Server: commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds. |
ts |
The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
database |
Database name. This parameter is left blank when dbType is set to Oracle. |
table |
Table name. |
type |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
isDdl |
Whether the operation is a DDL operation. |
sql |
A DDL-defined SQL statement. The value is "". |
sqlType |
JDBC type of the fields in the source table. |
data |
The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated. |
old |
Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted. |
pkNames |
Primary key name |
{
"columnType": {
"timestamp_column": "timestamp without time zone",
"tstzrange_column": "tstzrange",
"int4range_column": "int4range",
"char_column": "character",
"jsonb_column": "json",
"boolean_column": "boolean",
"bit_column": "bit",
"smallint_column": "smallint",
"bytea_column": "bytea"
},
"dbType": "GaussDB Primary/Standby",
"schema": "schema01",
"opType": "UPDATE",
"id": 332,
"es": 1639626187000,
"ts": 1639629261915,
"database": "database01",
"table": "table01",
"type": "UPDATE",
"isDdl": false,
"sql": "",
"sqlType": {
"timestamp_column": 16,
"tstzrange_column": 46,
"int4range_column": 42,
"char_column": 9,
"jsonb_column": 22,
"boolean_column": 8,
"bit_column": 20,
"smallint_column": 2,
"bytea_column": 15
},
"data": [
{
"timestamp_column": "2021-12-16 12:31:49.344365",
"tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}",
"boolean_column": "false",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"old": [
{
"timestamp_column": "2014-07-02 06:14:00.742",
"tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}",
"boolean_column": "true",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"pkNames": null
}
JSON-C
JSON-C is similar to JSON. The difference lies in the delete operation. JSON data is stored in old, and JSON-C is stored in data. Data of the timestamp type is converted into a character string in the format of yyyy-mm-dd hh:mm:ss.
Parameter |
Description |
---|---|
mysqlType |
Field name and type in the source table. |
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
es |
The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds. |
ts |
The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
database |
Database name. For the Oracle database, set this parameter to schema. |
table |
Table name. |
type |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
isDdl |
Whether the operation is a DDL operation. |
sql |
A DDL-defined SQL statement. The value is "". |
sqlType |
JDBC type of the fields in the source table. |
data |
Latest data, which is a JSON array. If type is set to INSERT, this parameter indicates the latest inserted data. If type is set to UPDATE, this parameter indicates the latest updated data. If type is set to DELETE, this parameter indicates the deleted data. |
old |
Old data. If type is set to UPDATE, the value indicates the data before update. If type is set to INSERT, the value is null. |
pkNames |
Primary key name |
Common Escape Characters in JSON
Character |
Escape character |
---|---|
< |
\u003c |
= |
\u003d |
> |
\u003e |
& |
\u0026 |
' |
\u0027 |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot