GDS-Kafka Data Access
GDS-Kafka consumes and caches data from Kafka. If the data cache time or size reaches a preconfigured threshold, GDS-Kafka will copy the data to a GaussDB(DWS) temporary table, and then insert or update data in the temporary table.
- The format of data generated by the Kafka message producer is specified by the kafka.source.event.type parameter. For details, see Message Formats Supported by GDS-Kafka.
- In GDS-Kafka, you can directly insert data for tables without primary keys, or update data by merging. Direct insert can achieve better performance, because it does not involve update operations. Determine your update mode based on the target table type in GaussDB(DWS). The data import mode is determined by the app.insert.directly parameter and whether a primary key exists. For details, see GDS-Kafka Data Import Modes.
- GDS-kafka only allows lowercase target table and column names.
- GDS-Kafka deletes historical data based on pos in the extended field. If imported data involves the delete operation, the extended field must be used.
Message Formats Supported by GDS-Kafka
kafka.source.event.type |
Format |
Description |
||
---|---|---|---|---|
cdc.drs.avro |
Internal format of Huawei Cloud DRS. DRS generates data in the avro format used by Kafka. GDS-Kafka can directly interconnect with DRS to parse and import the data. |
None |
||
drs.cdc |
To use the avro format for drs.cdc, specify the Maven dependency of GDS-Kafka-common and GDS-Kafka-source in the upstream programs of Kafka, and then create and fill in the Record object. A Record object represents a table record. It will be serialized into a byte[] array, produced and sent to Kafka, and used by the downstream GDS-Kafka. In the following example, the target table is the person table in the public schema. The person table consists of the id, name, and age fields. The op_type is U, which indicates an update operation. This example changes the name field from a to b in the record with the ID 0, and changes the value of the age field from 18 to 20.
|
Standard avro format:
|
||
cdc.json |
In the following example, the target table is the person table in the public schema. The person table consists of the id, name, and age fields. The op_type is U, which indicates an update operation. This example changes the name field from a to b in the record with the ID 1, and changes the value of the age field from 18 to 20.
|
Standard JSON format:
|
||
industrial.iot.json |
|
IoT data format:
|
||
industrial.iot.recursion.json |
|
IoT data format:
|
||
industrial.iot.event.json.independent.table |
|
IoT event stream data format:
|
||
industrial.iot.json.multi.events |
|
IoT event stream data format:
|
GDS-Kafka Import Modes
To import GDS-Kafka data to the database, copy the data to a temporary table, and then merge or insert the data. The following table describes their usage and scenarios.
Operation |
app.insert.directly |
Primary Key Table |
Import Mode |
---|---|---|---|
insert |
true (only for tables without primary keys) |
No |
Use INSERT SELECT to write data from the temporary table to the target table. |
false |
Yes |
Merge data from the temporary table to the target table based on the primary key. |
|
No |
Use INSERT SELECT to write data from the temporary table to the target table. |
||
delete |
true (only for tables without primary keys) |
No |
Use INSERT SELECT to write data from the temporary table to the target table. |
false
NOTE:
You can mark deletion by configuring the app.del.flag parameter. The flag of a deleted record will be set to 1. |
Yes |
|
|
No |
|
||
update |
true (only for tables without primary keys) |
No |
Use INSERT SELECT to write data from the temporary table to the target table. |
false
NOTE:
The update operation is split. The message in before or beforeImage is processed as a delete operation, and the message in after or afterImage is processed as an insert operation. Then, the message is saved to the database based on the insert and delete operations. |
Yes |
Equivalent to the insert+delete operation on a table with a primary key. |
|
No |
Equivalent to the insert+delete operation on a table without a primary key. |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot