Estos contenidos se han traducido de forma automática para su comodidad, pero Huawei Cloud no garantiza la exactitud de estos. Para consultar los contenidos originales, acceda a la versión en inglés.
Actualización más reciente 2024-06-12 GMT+08:00

Acceso a datos de GDS-Kafka

GDS-Kafka consume y almacena en caché datos de Kafka. Si el tiempo o el tamaño de la caché de datos alcanza un umbral preconfigurado, GDS-Kafka copiará los datos a una tabla temporal de GaussDB(DWS) y, a continuación, insertará o actualizará los datos en la tabla temporal.

  1. El formato de los datos generados por el productor de mensajes Kafka se especifica mediante el parámetro kafka.source.event.type. Para obtener más información, consulte Formatos de mensaje Formatos de mensaje compatibles con GDS-Kafka.
  2. En GDS-Kafka, puede insertar directamente datos para tablas sin claves principales o actualizar datos mediante la fusión. La inserción directa puede lograr un mejor rendimiento, ya que no implica operaciones de actualización. Determine el modo de actualización en función del tipo de tabla de destino en GaussDB(DWS). El modo de importación de datos viene determinado por el parámetro app.insert.directly y si existe una clave principal. Para obtener más información, consulte Modos de importación de datos GDS-Kafka.
  • GDS-kafka solo permite nombres de columnas y tablas de destino en minúsculas.
  • GDS-Kafka elimina los datos históricos basados en pos en el campo extendido. Si los datos importados implican la operación de eliminación, se debe utilizar el campo extendido.

Formatos de mensaje compatibles con GDS-Kafka

Tabla 1 Formatos de mensaje compatibles con GDS-Kafka

kafka.source.event.type

Formato

Descripción

cdc.drs.avro

Formato interno de Huawei Cloud DRS. DRS genera datos en el formato avro usado por Kafka. GDS-Kafka puede interconectarse directamente con DRS para analizar e importar los datos.

Ninguno

drs.cdc

Para utilizar el formato avro para drs.cdc, especifique la dependencia Maven de GDS-Kafka-common y GDS-Kafka-source en los programas anteriores de Kafka y, a continuación, cree y rellene el objeto Record. Un objeto Record representa un registro de tabla. Se serializará en una matriz byte[], producida y enviada a Kafka, y utilizada por el GDS-Kafka descendente.

En el siguiente ejemplo, la tabla de destino es la tabla person del esquema public. La tabla person consta de los campos id, name y age. El op_type es de U lo que indica una operación de actualización. En este ejemplo se cambia el campo name de a a b en el registro con el ID 0 y se cambia el valor del campo age de 18 a 20.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Record record = new Record();
// Set the schema and table name of the target table.
record.setTableName("public.person");
// Set the field list.
List<Field> fields = new ArrayList<>();
fields.add(new Field("id", 0));
fields.add(new Field("name", 1));
fields.add(new Field("age", 2));
record.setFields(fields);
// Set the field value list before the table record is updated.
List<Object> before = new ArrayList<>();
before.add(new Integer(0, "0"));
before.add(new Character("utf-8", ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8))));
before.add(new Integer(0, "18"));
record.setBeforeImages(before);
// Set the field value list after the table record is updated.
List<Object> after = new ArrayList<>();
after.add(new Integer(0, "0"));
after.add(new Character("utf-8", ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8))));
after.add(new Integer(0, "20"));
record.setAfterImages(after);
// Set the operation type.
record.setOperation("U");
// Set the operation time.
record.setUpdateTimestamp(325943905);
// Serialize the record object into a byte[] array.
byte[] msg = Record.getEncoder().encode(record).array();

Formato avro estándar:

  • El campo tableName se utiliza para describir la tabla de destino y los nombres de esquema a los que pertenece el registro actual. [Obligatorio]
  • El campo operation se utiliza para describir el tipo de operación del registro actual. I indica inserción, U indica actualización y D indica eliminación. [Obligatorio]
  • updateTimestamp indica la hora en que se realiza una operación en el extremo de origen. [Opcional]
  • La lista beforeImages describe la información antes de actualizar o eliminar el registro actual. Los campos del before body corresponden a los de la tabla de destino. [Obligatorio para U/D]
  • La lista afterImages describe la información actualizada o recién insertada del registro actual. [Obligatorio para U/D]
  • La lista fields describe la lista de campos del registro de tabla actual. El índice values de los campos debe estar en la misma secuencia que los de beforeImage y afterImage. [Obligatorio]

cdc.json

En el siguiente ejemplo, la tabla de destino es la tabla person del esquema public. La tabla person consta de los campos id, name y age. El op_type es de U lo que indica una operación de actualización. En este ejemplo se cambia el campo name de a a b en el registro con el ID 1 y se cambia el valor del campo age de 18 a 20.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
"table": "public.person",
"op_type": "U",
"op_ts": "1668426344",
"current_ts": "1668426344",
"before": {
"id":"1",
"name":"a",
"age": 18
},
"after": {
"id":"1",
"name":"b",
"age": 20
}
}

Formato JSON estándar:

  • El campo table describe la tabla de destino y los nombres de esquema a los que pertenece el registro actual. [Obligatorio]
  • El campo op_type se utiliza para describir el tipo de operación del registro actual. I indica inserción, U indica actualización y D indica eliminación. [Obligatorio]
  • op_ts indica la hora en que se realiza una operación en el extremo de origen. [Opcional]
  • current_ts indica la hora a la que se importa un mensaje a Kafka. [Opcional]
  • El objeto before describe la información antes de actualizar o eliminar el registro actual. Los campos del before body corresponden a los de la tabla de destino. [Obligatorio para U/D]
  • La lista de objetos after describe la actualización o la información recién insertada del registro actual. [Obligatorio para U/D]

industrial.iot.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"working_time":"10000"
},
}

Formato de datos de IoT:

  • thing_model_name en header indica el nombre de tabla. [Obligatorio]
  • Los valores de thing_id, instance_id y timestamp en header y el contenido en el cuerpo comprenden los campos del registro actual.
  • Los datos de IoT son datos de series temporales y no implican actualización o eliminación. Sólo se involucran las operaciones de inserción.

industrial.iot.recursion.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"property":{
  "key1":"1",
  "key2":2
},
"working_time":"10000"
},
}

Formato de datos de IoT:

  • thing_model_name en header indica el nombre de tabla. [Obligatorio]
  • Los valores de thing_id, instance_id y timestamp en header y el contenido en el cuerpo comprenden los campos del registro actual.
  • Los datos de IoT son datos de series temporales y no implican actualización o eliminación. Sólo se involucran las operaciones de inserción.
  • En este formato de datos, la clave y el valor de body se añaden a los campos property y value en el nuevo formato para generar múltiples piezas de datos nuevos. De esta manera, las filas se convierten en columnas.

industrial.iot.event.json.independent.table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2
    }
}

Formato de datos de flujo de eventos de IoT:

  • event_name indica un nombre de tabla. [Obligatorio]
  • event_id, start_time, end_time y fields comprenden el contenido de campo de un registro. [Obligatorio]
  • Los datos de flujo de eventos IoT son datos de series temporales y no implican actualización ni eliminación. Sólo se involucran las operaciones de inserción.

industrial.iot.json.multi.events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2,
    "field3":{
       "key1":"1",
       "key2":2
       }
    }
}

Formato de datos de flujo de eventos de IoT:

  • event_name indica un nombre de tabla. [Obligatorio]
  • event_id, start_time, end_time y fields comprenden el contenido de campo de un registro. [Obligatorio]
  • Los datos de flujo de eventos IoT son datos de series temporales y no implican actualización ni eliminación. Sólo se involucran las operaciones de inserción.
  • En este formato de datos, la clave y el valor de fields se añaden a los campos field_name y field_value en el nuevo formato para generar múltiples piezas de datos nuevos. De esta manera, las filas se convierten en columnas.

Modos de importación de GDS-Kafka

Para importar datos de GDS-Kafka a la base de datos, copie los datos en una tabla temporal y, a continuación, combine o inserte los datos. En la siguiente tabla se describe su uso y escenarios.

Tabla 2 Modos de importación de GDS-Kafka

Operación

app.insert.directly

Tabla de claves primarias

Modo de importación

insert

true (solo para tablas sin claves primarias)

No

Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino.

false

Combinar datos de la tabla temporal a la tabla de destino en función de la clave principal.

No

Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino.

delete

true (solo para tablas sin claves primarias)

No

Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino.

false

NOTA:

Puede marcar la eliminación configurando el parámetro app.del.flag. El indicador de un registro eliminado se establecerá en 1.

  • Si el campo delflag está definido, la combinación se realizará en función de la clave principal. Si se encuentra una clave primaria coincidente y el valor de pos en la tabla de destino es menor que el de la tabla temporal, el campo delflag se establecerá en 1. De lo contrario, se insertará un nuevo registro.
  • Si el campo delflag no está definido, se encuentra una clave principal coincidente y el valor de pos en la tabla de destino es menor que el de la tabla temporal, el registro se eliminará de la tabla de destino.

No

  • Si se establece el campo delflag, todos los campos de la tabla temporal se usarán para coincidir y combinar con la tabla de destino. Si se encuentra un registro coincidente y el valor de pos en la tabla de destino es menor que el de la tabla temporal, el campo delflag se establecerá en 1. De lo contrario, se insertará un nuevo registro.
  • Si el campo delflag no está definido, todos los campos de la tabla temporal se usarán para coincidir con la tabla de destino. Si se encuentra un registro coincidente y el valor de pos en la tabla de destino es menor que el de la tabla temporal, el registro coincidente se eliminará de la tabla de destino.

update

true (solo para tablas sin claves primarias)

No

Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino.

false

NOTA:

La operación de actualización está separada. El mensaje de before o beforeImage se procesa como una operación de eliminación, y el mensaje de after o afterImage se procesa como una operación de inserción. A continuación, el mensaje se guarda en la base de datos basándose en las operaciones de inserción y eliminación.

Equivalente a la operación insertar+eliminar en una tabla con una clave primaria.

No

Equivalente a la operación insertar+eliminar en una tabla sin clave primaria.