Acesso a dados de GDS-Kafka
O GDS-Kafka consome e armazena em cache dados do Kafka. Se o tempo ou o tamanho do cache de dados atingir um limite pré-configurado, o GDS-Kafka copiará os dados para uma tabela temporária do GaussDB(DWS) e, em seguida, inserirá ou atualizará os dados na tabela temporária.
- O formato dos dados gerados pelo produtor de mensagens do Kafka é especificado pelo parâmetro kafka.source.event.type. Para obter detalhes, consulte Formatos de mensagem suportados pelo GDS-Kafka.
- No GDS-Kafka, você pode inserir dados diretamente para tabelas sem chaves primárias ou atualizar dados mesclando. A inserção direta pode alcançar um melhor desempenho, porque não envolve operações de atualização. Determine seu modo de atualização com base no tipo de tabela de destino em GaussDB(DWS). O modo de importação de dados é determinado pelo parâmetro app.insert.directly e se existe uma chave primária. Para obter detalhes, consulte Modos de importação de dados do GDS-Kafka.
- O GDS-kafka só permite nomes de tabela e coluna de destino em minúsculas.
- O GDS-Kafka apaga os dados históricos com base no pos no campo estendido. Se os dados importados envolverem a operação de exclusão, o campo estendido deve ser usado.
Formatos de mensagem suportados pelo GDS-Kafka
kafka.source.event.type |
Formato |
Descrição |
||
---|---|---|---|---|
cdc.drs.avro |
Formato interno do DRS da Huawei Cloud. O DRS gera dados no formato avro usado pelo Kafka. O GDS-Kafka pode interconectar diretamente com o DRS para analisar e importar os dados. |
Nenhuma |
||
drs.cdc |
Para usar o formato avro para drs.cdc, especifique a dependência Maven de GDS-Kafka-common e GDS-Kafka-source nos programas upstream do Kafka e, em seguida, crie e preencha o objeto Record. Um objeto Record representa um registro de tabela. Ele será serializado em uma matriz de byte[], produzido e enviado para o Kafka e usado pelo downstream de GDS-Kafka. No exemplo a seguir, a tabela de destino é a tabela person no esquema public. A tabela person consiste nos campos id, name e age. O op_type é U, que indica uma operação de atualização. Este exemplo altera o campo name de a para b no registro com o ID 0, e altera o valor do campo age de 18 para 20.
|
Formato avro padrão:
|
||
cdc.json |
No exemplo a seguir, a tabela de destino é a tabela person no esquema public. A tabela person consiste nos campos id, name e age. O op_type é U, que indica uma operação de atualização. Este exemplo altera o campo name de a para b no registro com o ID 1, e altera o valor do campo age de 18 para 20.
|
Formato JSON padrão:
|
||
industrial.iot.json |
|
Formato de dados da IoT:
|
||
industrial.iot.recursion.json |
|
Formato de dados da IoT:
|
||
industrial.iot.event.json.independent.table |
|
Formato de dados de fluxo de eventos IoT:
|
||
industrial.iot.json.multi.events |
|
Formato de dados de fluxo de eventos IoT:
|
Modos de importação do GDS-Kafka
Para importar dados do GDS-Kafka para o banco de dados, copie os dados para uma tabela temporária e, em seguida, mescle ou insira os dados. A tabela a seguir descreve seu uso e cenários.
Operação |
app.insert.directly |
Tabela de chave primária |
Modo de importação |
---|---|---|---|
insert |
true (somente para tabelas sem chaves primárias) |
Não |
Use INSERT SELECT para gravar dados da tabela temporária na tabela de destino. |
false |
Sim |
Mescle dados da tabela temporária para a tabela de destino com base na chave primária. |
|
Não |
Use INSERT SELECT para gravar dados da tabela temporária na tabela de destino. |
||
delete |
true (somente para tabelas sem chaves primárias) |
Não |
Use INSERT SELECT para gravar dados da tabela temporária na tabela de destino. |
false
NOTA:
Você pode marcar a exclusão configurando o parâmetro app.del.flag. A bandeira de um registro excluído será definida como 1. |
Sim |
|
|
Não |
|
||
update |
true (somente para tabelas sem chaves primárias) |
Não |
Use INSERT SELECT para gravar dados da tabela temporária na tabela de destino. |
false
NOTA:
A operação de atualização é dividida. A mensagem em before ou beforeImage é processada como uma operação de exclusão, e a mensagem em after ou afterImage é processada como uma operação de inserção. Em seguida, a mensagem é salva no banco de dados com base nas operações de inserção e exclusão. |
Sim |
Equivalente à operação insert+delete em uma tabela com uma chave primária. |
|
Não |
Equivalente à operação insert+delete em uma tabela sem uma chave primária. |