Updated on 2024-06-20 GMT+08:00

Managing Instances

Overview

Data migration provides independent clusters for secure and reliable data migration. Clusters are isolated from each other and cannot access each other. With instance management, you can easily create and manage clusters by purchasing GDS-Kafka instances. GDS-Kafka consumes and caches data from Kafka. If the data cache time or size reaches a preconfigured threshold, GDS-Kafka will copy the data to a GaussDB(DWS) temporary table, and then insert or update data in the temporary table.

  1. The format of data generated by the Kafka message producer is specified by the kafka.source.event.type parameter. For details, see Message Formats Supported by GDS-Kafka.
  2. In GDS-Kafka, you can directly insert data for tables without primary keys, or update data by merging. Direct insert can achieve better performance, because it does not involve update operations. Determine your update mode based on the target table type in GaussDB(DWS). The data import mode is determined by the app.insert.directly parameter and whether a primary key exists. For details, see GDS-Kafka Data Import Modes.
  • GDS-kafka only allows lowercase target table and column names.
  • GDS-Kafka deletes historical data based on pos in the extended field. If imported data involves the delete operation, the extended field must be used.

Purchasing a GDS-Kafka Instance

To use the data migration feature, you need to purchase a GDS-kafka instance (cluster). Cluster instances provide secure and reliable data migration services. Clusters are isolated from each other.

Constraints

  • Currently, only standalone clusters are supported.
  • Only the pay-per-use billing mode is supported.

Procedure

  1. Log in to the GaussDB(DWS) console.
  2. In the navigation pane, choose Data > Data Integration > Instances.
  3. In the upper right corner of the page, click Buy GDS-Kafka Instance. Configure cluster parameters.

    Table 1 Parameter description

    Parameter

    Description

    Example Value

    CPU Architecture

    The following CPU architectures can be selected:

    • x86
    • Kunpeng
    NOTE:

    The only difference between the x86 and Kunpeng architectures lies in the underlying architecture, of which the application layer is unaware. The same SQL syntax is used. If x86 servers are sold out when you create a cluster, select the Kunpeng architecture.

    x86

    Flavor

    Select a node flavor.

    -

    Capacity

    Storage capacity of a node.

    -

    Current Flavor

    Current flavor of the cluster.

    -

    Name

    Set the name of the data warehouse cluster.

    Enter 4 to 64 characters. Only case-insensitive letters, digits, hyphens (-), and underscores (_) are allowed. The value must start with a letter. Letters are not case-sensitive.

    -

    Version

    Version of the database instance installed in the cluster. The version in the screenshot is for reference only.

    8.2.1.300

    VPC

    Specify a VPC to isolate the cluster's network.

    If you create a data warehouse cluster for the first time and have not configured the VPC, click View VPC. On the VPC management console that is displayed, create a VPC as needed.

    -

    Subnet

    Specify a VPC subnet.

    A subnet provides dedicated network resources that are isolated from other networks, improving network security.

    -

    Security Group

    Specify a VPC security group.

    A security group restricts access rules to enhance security when GaussDB(DWS) and other services access each other.

    -

    EIP

    Specify whether users can use a client to connect to a cluster's database over the Internet. The following methods are supported:

    • Do not use: Do not specify any EIPs here. If GaussDB(DWS) is used in the production environment, first bind it to ELB, and then bind it to an EIP on the ELB page.
    • Automatically assign: Specify bandwidth for EIPs, and the system will automatically assign EIPs with dedicated bandwidth to clusters. You can use the EIPs to access the clusters over the Internet. The bandwidth name of an automatically assigned EIP starts with the cluster name.
    • Specify: Specify an EIP to be bound to the cluster. If no available EIPs are displayed in the drop-down list, click Create EIP to go to the Elastic IP page and create an EIP as needed. The bandwidth can be customized.

    -

    Enterprise Project

    Select the enterprise project of the cluster. You can configure this parameter only when the Enterprise Project Management service is enabled. The default value is default.

    default

  4. Confirm the information and click Submit.

Viewing Instance Details

On the instance details page, you can view the basic information and network information about the cluster.

Procedure

  1. Log in to the GaussDB(DWS) console.
  2. In the navigation pane, choose Data > Data Integration > Instances.
  3. Click the name of an instance to go to the instance details page.

    Figure 1 Viewing Instance Details

Message Formats Supported by GDS-Kafka

Table 2 Message formats supported by GDS-Kafka

Kafka Source Event Type

Format

Description

cdc.drs.avro

Internal format of Huawei Cloud DRS. DRS generates data in the avro format used by Kafka. GDS-Kafka can directly interconnect with DRS to parse and import the data.

None

drs.cdc

To use the avro format for drs.cdc, specify the Maven dependency of GDS-Kafka-common and GDS-Kafka-source in the upstream programs of Kafka, and then create and fill in the Record object. A Record object represents a table record. It will be serialized into a byte[] array, produced and sent to Kafka, and used by the downstream GDS-Kafka.

In the following example, the target table is the person table in the public schema. The person table consists of the id, name, and age fields. The op_type is U, which indicates an update operation. This example changes the name field from a to b in the record with the ID 0, and changes the value of the age field from 18 to 20.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Record record = new Record();
// Set the schema and table name of the target table.
record.setTableName("public.person");
// Set the field list.
List<Field> fields = new ArrayList<>();
fields.add(new Field("id", 0));
fields.add(new Field("name", 1));
fields.add(new Field("age", 2));
record.setFields(fields);
// Set the field value list before the table record is updated.
List<Object> before = new ArrayList<>();
before.add(new Integer(0, "0"));
before.add(new Character("utf-8", ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8))));
before.add(new Integer(0, "18"));
record.setBeforeImages(before);
// Set the field value list after the table record is updated.
List<Object> after = new ArrayList<>();
after.add(new Integer(0, "0"));
after.add(new Character("utf-8", ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8))));
after.add(new Integer(0, "20"));
record.setAfterImages(after);
// Set the operation type.
record.setOperation("U");
// Set the operation time.
record.setUpdateTimestamp(325943905);
// Serialize the record object into a byte[] array.
byte[] msg = Record.getEncoder().encode(record).array();

Standard avro format:

  • The tableName field is used to describe the target table and schema names that the current record belongs to. [Mandatory]
  • The operation field is used to describe the operation type of the current record. I indicates insert, U indicates update, and D indicates deletion. [Mandatory]
  • updateTimestamp indicates the time when an operation is performed on the source end. [Optional]
  • The beforeImages list describes the information before the current record is updated or deleted. The fields in the before body correspond to those in the target table. [Mandatory for U/D]
  • The afterImages list describes the updated or newly inserted information of the current record. [Mandatory for U/D]
  • The fields list describes the field list of the current table record. The index values of the fields must be in the same sequence as those in beforeImage and afterImage. [Mandatory]

cdc.json

In the following example, the target table is the person table in the public schema. The person table consists of the id, name, and age fields. The op_type is U, which indicates an update operation. This example changes the name field from a to b in the record with the ID 1, and changes the value of the age field from 18 to 20.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
"table": "public.person",
"op_type": "U",
"op_ts": "1668426344",
"current_ts": "1668426344",
"before": {
"id":"1",
"name":"a",
"age": 18
},
"after": {
"id":"1",
"name":"b",
"age": 20
}
}

Standard JSON format:

  • The table field describes the target table and schema names that the current record belongs to. [Mandatory]
  • The op_type field is used to describe the operation type of the current record. I indicates insert, U indicates update, and D indicates deletion. [Mandatory]
  • op_ts indicates the time when an operation is performed on the source end. [Optional]
  • current_ts indicates the time when a message is imported to Kafka. [Optional]
  • The before object describes the information before the current record is updated or deleted. The fields in the before body correspond to those in the target table. [Mandatory for U/D]
  • The after object list describes the update or newly inserted information of the current record. [Mandatory for U/D]

industrial.iot.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"working_time":"10000"
},
}

IoT data format:

  • thing_model_name in header indicates the table name. [Mandatory]
  • The values of thing_id, instance_id, and timestamp in header and the content in the body comprise the fields of the current record.
  • IoT data is time series data and does not involve update or deletion. Only insert operations are involved.

industrial.iot.recursion.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"property":{
  "key1":"1",
  "key2":2
},
"working_time":"10000"
},
}

IoT data format:

  • thing_model_name in header indicates the table name. [Mandatory]
  • The values of thing_id, instance_id, and timestamp in header and the content in the body comprise the fields of the current record.
  • IoT data is time series data and does not involve update or deletion. Only insert operations are involved.
  • In this data format, the key and value of body are added to the property and value fields in the new format to generate multiple pieces of new data. In this way, rows are converted to columns.

industrial.iot.event.json.independent.table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2
    }
}

IoT event stream data format:

  • event_name indicates a table name. [Mandatory]
  • event_id, start_time, end_time, and fields comprise the field content of a record. [Mandatory]
  • IoT event stream data is time series data and does not involve update or deletion. Only insert operations are involved.

industrial.iot.json.multi.events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2,
    "field3":{
       "key1":"1",
       "key2":2
       }
    }
}

IoT event stream data format:

  • event_name indicates a table name. [Mandatory]
  • event_id, start_time, end_time, and fields comprise the field content of a record. [Mandatory]
  • IoT event stream data is time series data and does not involve update or deletion. Only insert operations are involved.
  • In this data format, the key and value of fields are added to the field_name and field_value fields in the new format to generate multiple pieces of new data. In this way, rows are converted to columns.

GDS-Kafka Import Modes

To import GDS-Kafka data to the database, copy the data to a temporary table, and then merge or insert the data. The following table describes their usage and scenarios.

Table 3 GDS-Kafka import modes

Operation

Direct Insertion

Primary Key Table

Import Mode

insert

true (only for tables without primary keys)

No

Use INSERT SELECT to write data from the temporary table to the target table.

false

Yes

Merge data from the temporary table to the target table based on the primary key.

No

Use INSERT SELECT to write data from the temporary table to the target table.

delete

true (only for tables without primary keys)

No

Use INSERT SELECT to write data from the temporary table to the target table.

false

NOTE:

You can mark deletion by configuring the app.del.flag parameter. The flag of a deleted record will be set to 1.

Yes

  • If the delflag field is set, merge will be performed based on the primary key. If a matched primary key is found, and the value of pos in the target table is smaller than that in the temporary table, the delflag field will be set to 1. Otherwise, a new record will be inserted.
  • If the delflag field is not set, a matched primary key is found, and the value of pos in the target table is smaller than that in the temporary table, the record will be deleted from the target table.

No

  • If the delflag field is set, all the fields in the temporary table will be used to match and merge with the target table. If a matched record is found, and the value of pos in the target table is smaller than that in the temporary table, the delflag field will be set to 1. Otherwise, a new record will be inserted.
  • If the delflag field is not set, all the fields in the temporary table will be used to match the target table. If a matched record is found, and the value of pos in the target table is smaller than that in the temporary table, the matched record will be deleted from the target table.

update

true (only for tables without primary keys)

No

Use INSERT SELECT to write data from the temporary table to the target table.

false

NOTE:

The update operation is split. The message in before or beforeImage is processed as a delete operation, and the message in after or afterImage is processed as an insert operation. Then, the message is saved to the database based on the insert and delete operations.

Yes

Equivalent to the insert+delete operation on a table with a primary key.

No

Equivalent to the insert+delete operation on a table without a primary key.