Kafka Message Format
Data synchronized to the Kafka cluster is stored in Avro, JSON, and JSON-C formats.
Avro Format
For details about the schema definition in Avro format, see record.rar. After data is synchronized to the Kafka cluster, parse data based on the definition of the Avro schema. For details about the data parsing, see drs-avro-read.rar.
JSON
Parameter |
Description |
---|---|
mysqlType |
Field name and type in the source table. |
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
es |
The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds. |
ts |
The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
database |
Database name |
table |
Table name. |
type |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
isDdl |
Whether the operation is a DDL operation. |
sql |
A DDL-defined SQL statement. The value is "". |
sqlType |
JDBC type of the fields in the source table. |
data |
The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated. |
old |
Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted. |
pkNames |
Primary key name |
{ "mysqlType":{ "c11":"binary", "c10":"varchar", "c13":"text", "c12":"varbinary", "c14":"blob", "c1":"varchar", "c2":"varbinary", "c3":"int", "c4":"datetime", "c5":"timestamp", "c6":"char", "c7":"float", "c8":"double", "c9":"decimal", "id":"int" }, "id":27677, "es":1624614713000, "ts":1625058726990, "database":"test01", "table":"test ", "type":"UPDATE", "isDdl":false, "sql":"", "sqlType":{ "c11":-2, "c10":12, "c13":-1, "c12":-3, "c14":2004, "c1":12, "c2":-3, "c3":4, "c4":94, "c5":93, "c6":1, "c7":6, "c8":8, "c9":3, "id":4 }, "data":[ { "c11":"[]", "c10": "Huawei Cloud huaweicloud", "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"104" } ], "old":[ { "c11":"[]", "c10": "Huawei Cloud huaweicloud", "c13":"asfiajhfiaf939-0239", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"103" } ], "pkNames":[ "id" ] }
Parameter |
Description |
---|---|
columnType |
Field name and data type in the source table
NOTE:
|
dbType |
Source database type The types of different engines are as follows: GaussDB Primary/Standby GaussDB Distributed PostgreSQL Oracle |
schema |
Name of a scheme |
opType |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
es |
The source DB engine types are as follows: GaussDB primary/standby: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds. GaussDB distributed: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds. PostgreSQL: commit time of the previous transaction. The value is a 13-digit Unix timestamp, in milliseconds. Oracle: commit time of a record. The value is a 13-digit Unix timestamp, in milliseconds. |
ts |
The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
database |
Database name. This parameter is left blank when dbType is set to Oracle. |
table |
Table name. |
type |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
isDdl |
Whether the operation is a DDL operation. |
sql |
A DDL-defined SQL statement. The value is "". |
sqlType |
JDBC type of the fields in the source table. |
data |
The latest data, which is a JSON array. If the value of type is INSERT, the latest data is inserted. If the value of type is UPDATE, the latest data is updated. |
old |
Old data. If the value of type is UPDATE, the data is old. If the value of type is DELETE, the data is deleted. |
pkNames |
Primary key name |
{
"columnType": {
"timestamp_column": "timestamp without time zone",
"tstzrange_column": "tstzrange",
"int4range_column": "int4range",
"char_column": "character",
"jsonb_column": "json",
"boolean_column": "boolean",
"bit_column": "bit",
"smallint_column": "smallint",
"bytea_column": "bytea"
},
"dbType": "GaussDB Primary/Standby",
"schema": "schema01",
"opType": "UPDATE",
"id": 332,
"es": 1639626187000,
"ts": 1639629261915,
"database": "database01",
"table": "table01",
"type": "UPDATE",
"isDdl": false,
"sql": "",
"sqlType": {
"timestamp_column": 16,
"tstzrange_column": 46,
"int4range_column": 42,
"char_column": 9,
"jsonb_column": 22,
"boolean_column": 8,
"bit_column": 20,
"smallint_column": 2,
"bytea_column": 15
},
"data": [
{
"timestamp_column": "2021-12-16 12:31:49.344365",
"tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}",
"boolean_column": "false",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"old": [
{
"timestamp_column": "2014-07-02 06:14:00.742",
"tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}",
"boolean_column": "true",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"pkNames": null
}
JSON-C
JSON-C is similar to JSON. The difference lies in the delete operation. JSON data is stored in old, and JSON-C is stored in data. Data of the timestamp type is converted into a character string in the format of yyyy-mm-dd hh:mm:ss.
Parameter |
Description |
---|---|
mysqlType |
Field name and type in the source table. |
id |
Sequence number of an event operation defined in DRS. The value increases monotonically. |
es |
The time when the record is generated in the source database. The value is a 13-digit Unix timestamp in milliseconds. |
ts |
The time when the data is written to the target Kafka. The value is a 13-digit Unix timestamp in milliseconds. |
database |
Database name. For the Oracle database, set this parameter to schema. |
table |
Table name. |
type |
Operation type, such as DELETE, UPDATE, INSERT, and DDL. |
isDdl |
Whether the operation is a DDL operation. |
sql |
A DDL-defined SQL statement. The value is "". |
sqlType |
JDBC type of the fields in the source table. |
data |
Latest data, which is a JSON array. If type is set to INSERT, this parameter indicates the latest inserted data. If type is set to UPDATE, this parameter indicates the latest updated data. If type is set to DELETE, this parameter indicates the deleted data. |
old |
Old data. If type is set to UPDATE, the value indicates the data before update. If type is set to INSERT, the value is null. |
pkNames |
Primary key name |
Common Escape Characters in JSON
Character |
Escape character |
---|---|
< |
\u003d |
> |
\u003e |
& |
\u0026 |
= |
\u003d |
' |
\u0027 |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.