Updated on 2024-04-26 GMT+08:00

GDS-Kafka Data Access

GDS-Kafka consumes and caches data from Kafka. If the data cache time or size reaches a preconfigured threshold, GDS-Kafka will copy the data to a GaussDB(DWS) temporary table, and then insert or update data in the temporary table.

  1. The format of data generated by the Kafka message producer is specified by the kafka.source.event.type parameter. For details, see Message Formats Supported by GDS-Kafka.
  2. In GDS-Kafka, you can directly insert data for tables without primary keys, or update data by merging. Direct insert can achieve better performance, because it does not involve update operations. Determine your update mode based on the target table type in GaussDB(DWS). The data import mode is determined by the app.insert.directly parameter and whether a primary key exists. For details, see GDS-Kafka Data Import Modes.
  • GDS-kafka only allows lowercase target table and column names.
  • GDS-Kafka deletes historical data based on pos in the extended field. If imported data involves the delete operation, the extended field must be used.

Message Formats Supported by GDS-Kafka

Table 1 Message formats supported by GDS-Kafka

kafka.source.event.type

Format

Description

cdc.drs.avro

Internal format of Huawei Cloud DRS. DRS generates data in the avro format used by Kafka. GDS-Kafka can directly interconnect with DRS to parse and import the data.

None

drs.cdc

To use the avro format for drs.cdc, specify the Maven dependency of GDS-Kafka-common and GDS-Kafka-source in the upstream programs of Kafka, and then create and fill in the Record object. A Record object represents a table record. It will be serialized into a byte[] array, produced and sent to Kafka, and used by the downstream GDS-Kafka.

In the following example, the target table is the person table in the public schema. The person table consists of the id, name, and age fields. The op_type is U, which indicates an update operation. This example changes the name field from a to b in the record with the ID 0, and changes the value of the age field from 18 to 20.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Record record = new Record();
// Set the schema and table name of the target table.
record.setTableName("public.person");
// Set the field list.
List<Field> fields = new ArrayList<>();
fields.add(new Field("id", 0));
fields.add(new Field("name", 1));
fields.add(new Field("age", 2));
record.setFields(fields);
// Set the field value list before the table record is updated.
List<Object> before = new ArrayList<>();
before.add(new Integer(0, "0"));
before.add(new Character("utf-8", ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8))));
before.add(new Integer(0, "18"));
record.setBeforeImages(before);
// Set the field value list after the table record is updated.
List<Object> after = new ArrayList<>();
after.add(new Integer(0, "0"));
after.add(new Character("utf-8", ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8))));
after.add(new Integer(0, "20"));
record.setAfterImages(after);
// Set the operation type.
record.setOperation("U");
// Set the operation time.
record.setUpdateTimestamp(325943905);
// Serialize the record object into a byte[] array.
byte[] msg = Record.getEncoder().encode(record).array();

Standard avro format:

  • The tableName field is used to describe the target table and schema names that the current record belongs to. [Mandatory]
  • The operation field is used to describe the operation type of the current record. I indicates insert, U indicates update, and D indicates deletion. [Mandatory]
  • updateTimestamp indicates the time when an operation is performed on the source end. [Optional]
  • The beforeImages list describes the information before the current record is updated or deleted. The fields in the before body correspond to those in the target table. [Mandatory for U/D]
  • The afterImages list describes the updated or newly inserted information of the current record. [Mandatory for U/D]
  • The fields list describes the field list of the current table record. The index values of the fields must be in the same sequence as those in beforeImage and afterImage. [Mandatory]

cdc.json

In the following example, the target table is the person table in the public schema. The person table consists of the id, name, and age fields. The op_type is U, which indicates an update operation. This example changes the name field from a to b in the record with the ID 1, and changes the value of the age field from 18 to 20.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
"table": "public.person",
"op_type": "U",
"op_ts": "1668426344",
"current_ts": "1668426344",
"before": {
"id":"1",
"name":"a",
"age": 18
},
"after": {
"id":"1",
"name":"b",
"age": 20
}
}

Standard JSON format:

  • The table field describes the target table and schema names that the current record belongs to. [Mandatory]
  • The op_type field is used to describe the operation type of the current record. I indicates insert, U indicates update, and D indicates deletion. [Mandatory]
  • op_ts indicates the time when an operation is performed on the source end. [Optional]
  • current_ts indicates the time when a message is imported to Kafka. [Optional]
  • The before object describes the information before the current record is updated or deleted. The fields in the before body correspond to those in the target table. [Mandatory for U/D]
  • The after object list describes the update or newly inserted information of the current record. [Mandatory for U/D]

industrial.iot.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"working_time":"10000"
},
}

IoT data format:

  • thing_model_name in the header indicates the table name. [Mandatory]
  • The values of thing_id, instance_id, and timestamp in header and the content in the body comprise the fields of the current record. [Mandatory]
  • IoT data is time series data and does not involve update or deletion. Only insert operations are involved.

industrial.iot.recursion.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"property":{
  "key1":"1",
  "key2":2
},
"working_time":"10000"
},
}

IoT data format:

  • thing_model_name in the header indicates the table name. [Mandatory]
  • The values of thing_id, instance_id, and timestamp in header and the content in the body comprise the fields of the current record. [Mandatory]
  • IoT data is time series data and does not involve update or deletion. Only insert operations are involved.
  • In this data format, the key and value of body are added to the property and value fields in the new format to generate multiple pieces of new data. In this way, rows are converted to columns.

industrial.iot.event.json.independent.table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2
    }
}

IoT event stream data format:

  • event_name indicates the table name. [Mandatory]
  • event_id, start_time, end_time and the content in fields constitute the field content of the current record. [Mandatory]
  • IoT event stream data is time series data and does not involve update or deletion. Only insert operations are involved.

industrial.iot.json.multi.events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2,
    "field3":{
       "key1":"1",
       "key2":2
       }
    }
}

IoT event stream data format:

  • event_name indicates the table name. [Mandatory]
  • event_id, start_time, end_time and the content in fields constitute the field content of the current record. [Mandatory]
  • IoT event stream data is time series data and does not involve update or deletion. Only insert operations are involved.
  • In this data format, the key and value of fields are added to the field_name and field_value fields in the new format to generate multiple pieces of new data. In this way, rows are converted to columns.

GDS-Kafka Import Modes

To import GDS-Kafka data to the database, copy the data to a temporary table, and then merge or insert the data. The following table describes their usage and scenarios.

Table 2 GDS-Kafka import modes

Operation

app.insert.directly

Primary Key Table

Import Mode

insert

true (only for tables without primary keys)

No

Use INSERT SELECT to write data from the temporary table to the target table.

false

Yes

Merge data from the temporary table to the target table based on the primary key.

No

Use INSERT SELECT to write data from the temporary table to the target table.

delete

true (only for tables without primary keys)

No

Use INSERT SELECT to write data from the temporary table to the target table.

false

NOTE:

You can mark deletion by configuring the app.del.flag parameter. The flag of a deleted record will be set to 1.

Yes

  • If the delflag field is set, merge will be performed based on the primary key. If a matched primary key is found, and the value of pos in the target table is smaller than that in the temporary table, the delflag field will be set to 1. Otherwise, a new record will be inserted.
  • If the delflag field is not set, a matched primary key is found, and the value of pos in the target table is smaller than that in the temporary table, the record will be deleted from the target table.

No

  • If the delflag field is set, all the fields in the temporary table will be used to match and merge with the target table. If a matched record is found, and the value of pos in the target table is smaller than that in the temporary table, the delflag field will be set to 1. Otherwise, a new record will be inserted.
  • If the delflag field is not set, all the fields in the temporary table will be used to match the target table. If a matched record is found, and the value of pos in the target table is smaller than that in the temporary table, the matched record will be deleted from the target table.

update

true (only for tables without primary keys)

No

Use INSERT SELECT to write data from the temporary table to the target table.

false

NOTE:

The update operation is split. The message in before or beforeImage is processed as a delete operation, and the message in after or afterImage is processed as an insert operation. Then, the message is saved to the database based on the insert and delete operations.

Yes

Equivalent to the insert+delete operation on a table with a primary key.

No

Equivalent to the insert+delete operation on a table without a primary key.