Acceso a datos de GDS-Kafka
GDS-Kafka consume y almacena en caché datos de Kafka. Si el tiempo o el tamaño de la caché de datos alcanza un umbral preconfigurado, GDS-Kafka copiará los datos a una tabla temporal de GaussDB(DWS) y, a continuación, insertará o actualizará los datos en la tabla temporal.
- El formato de los datos generados por el productor de mensajes Kafka se especifica mediante el parámetro kafka.source.event.type. Para obtener más información, consulte Formatos de mensaje Formatos de mensaje compatibles con GDS-Kafka.
- En GDS-Kafka, puede insertar directamente datos para tablas sin claves principales o actualizar datos mediante la fusión. La inserción directa puede lograr un mejor rendimiento, ya que no implica operaciones de actualización. Determine el modo de actualización en función del tipo de tabla de destino en GaussDB(DWS). El modo de importación de datos viene determinado por el parámetro app.insert.directly y si existe una clave principal. Para obtener más información, consulte Modos de importación de datos GDS-Kafka.
- GDS-kafka solo permite nombres de columnas y tablas de destino en minúsculas.
- GDS-Kafka elimina los datos históricos basados en pos en el campo extendido. Si los datos importados implican la operación de eliminación, se debe utilizar el campo extendido.
Formatos de mensaje compatibles con GDS-Kafka
kafka.source.event.type |
Formato |
Descripción |
||
---|---|---|---|---|
cdc.drs.avro |
Formato interno de Huawei Cloud DRS. DRS genera datos en el formato avro usado por Kafka. GDS-Kafka puede interconectarse directamente con DRS para analizar e importar los datos. |
Ninguno |
||
drs.cdc |
Para utilizar el formato avro para drs.cdc, especifique la dependencia Maven de GDS-Kafka-common y GDS-Kafka-source en los programas anteriores de Kafka y, a continuación, cree y rellene el objeto Record. Un objeto Record representa un registro de tabla. Se serializará en una matriz byte[], producida y enviada a Kafka, y utilizada por el GDS-Kafka descendente. En el siguiente ejemplo, la tabla de destino es la tabla person del esquema public. La tabla person consta de los campos id, name y age. El op_type es de U lo que indica una operación de actualización. En este ejemplo se cambia el campo name de a a b en el registro con el ID 0 y se cambia el valor del campo age de 18 a 20.
|
Formato avro estándar:
|
||
cdc.json |
En el siguiente ejemplo, la tabla de destino es la tabla person del esquema public. La tabla person consta de los campos id, name y age. El op_type es de U lo que indica una operación de actualización. En este ejemplo se cambia el campo name de a a b en el registro con el ID 1 y se cambia el valor del campo age de 18 a 20.
|
Formato JSON estándar:
|
||
industrial.iot.json |
|
Formato de datos de IoT:
|
||
industrial.iot.recursion.json |
|
Formato de datos de IoT:
|
||
industrial.iot.event.json.independent.table |
|
Formato de datos de flujo de eventos de IoT:
|
||
industrial.iot.json.multi.events |
|
Formato de datos de flujo de eventos de IoT:
|
Modos de importación de GDS-Kafka
Para importar datos de GDS-Kafka a la base de datos, copie los datos en una tabla temporal y, a continuación, combine o inserte los datos. En la siguiente tabla se describe su uso y escenarios.
Operación |
app.insert.directly |
Tabla de claves primarias |
Modo de importación |
---|---|---|---|
insert |
true (solo para tablas sin claves primarias) |
No |
Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino. |
false |
Sí |
Combinar datos de la tabla temporal a la tabla de destino en función de la clave principal. |
|
No |
Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino. |
||
delete |
true (solo para tablas sin claves primarias) |
No |
Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino. |
false
NOTA:
Puede marcar la eliminación configurando el parámetro app.del.flag. El indicador de un registro eliminado se establecerá en 1. |
Sí |
|
|
No |
|
||
update |
true (solo para tablas sin claves primarias) |
No |
Utilice INSERT SELECT para escribir datos de la tabla temporal a la tabla de destino. |
false
NOTA:
La operación de actualización está separada. El mensaje de before o beforeImage se procesa como una operación de eliminación, y el mensaje de after o afterImage se procesa como una operación de inserción. A continuación, el mensaje se guarda en la base de datos basándose en las operaciones de inserción y eliminación. |
Sí |
Equivalente a la operación insertar+eliminar en una tabla con una clave primaria. |
|
No |
Equivalente a la operación insertar+eliminar en una tabla sin clave primaria. |