Uso de trabalhos de Flink de DLI para gravar dados do Kafka para GaussDB(DWS) em tempo real
Esta prática demonstra como usar trabalhos do Flink de DLI para sincronizar dados de consumo do Kafka para o GaussDB(DWS) em tempo real. O processo de demonstração inclui gravar e atualizar os dados existentes em tempo real.
- Para obter detalhes, consulte O que é o Data Lake Insight?
- Para obter detalhes sobre o Kafka, consulte O que é o DMS for Kafka?
Essa prática leva cerca de 90 minutos. Os serviços de nuvem usados nessa prática incluem Virtual Private Cloud (VPC) e sub-redes, Elastic Load Balance (ELB), Elastic Cloud Server (ECS), Object Storage Service (OBS), Distributed Message Service (DMS) for Kafka, Data Lake Insight (DLI) e Data Warehouse Service (DWS). O processo básico é o seguinte:
- Preparações
- Passo 1: criar uma instância do Kafka
- Passo 2: criar um cluster do GaussDB(DWS) e uma tabela de destino
- Passo 3: criar uma fila de DLI
- Passo 4: criar uma conexão de origem de dados avançada para Kafka e GaussDB(DWS)
- Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink
- Passo 6: criar e editar um trabalho do Flink de DLI
- Passo 7: criar e modificar mensagens no cliente do Kafka
Descrição do cenário
Suponha que os dados de exemplo da fonte de dados de Kafka é uma tabela de informações do usuário, como mostrado em Tabela 1, que contém os campos id, name e age. O campo id é único e fixo, que é compartilhado por vários sistemas de serviço. Geralmente, o campo id não precisa ser modificado. Somente os campos name e age precisam ser modificados.
Use o Kafka para gerar os três grupos de dados a seguir e use os trabalhos de Flink de DLI para sincronizar os dados com o GaussDB(DWS): Altere os usuários cujos IDs são 2 e 3 para jim e tom e use os trabalhos de Flink de DLI para atualizar dados e sincronizar os dados com o GaussDB(DWS).
Restrições
- Certifique-se de que VPC, ECS, OBS, Kafka, DLI e GaussDB(DWS) estejam na mesma região, por exemplo, China-Hong Kong.
- Certifique-se de que Kafka, DLI e GaussDB(DWS) possam se comunicar uns com os outros. Nesta prática, Kafka e GaussDB(DWS) são criados na mesma região e VPC, e os grupos de segurança de Kafka e GaussDB(DWS) permitem o segmento de rede das filas de DLI.
- Para garantir que a ligação entre DLI e DWS é estável, vincule o serviço ELB para o cluster de armazém de dados criado.
Preparativos
- Você registrou uma conta da Huawei e ativou os serviços da Huawei Cloud.. Antes de usar o GaussDB(DWS), verifique o status da conta. A conta não pode estar em atraso ou congelada.
- Você criou uma VPC e uma sub-rede. Para obter detalhes, consulte Criação de uma VPC.
Passo 1: criar uma instância do Kafka
- Faça logon no console de gerenciamento do Huawei Cloud e escolha Middleware > Distributed Message Service (for Kafka) na lista de serviços. O console de gerenciamento do Kafka é exibido.
- Clique em DMS for Kafka à esquerda e clique em Buy Instance no canto superior direito.
- Defina os seguintes parâmetros. Retém os valores padrão para outros parâmetros que não estão descritos na tabela.
Tabela 2 Parâmetros de instância do Kafka Parâmetro
Valor
Billing Mode
Pay-per-use
Region
CN-Hong Kong
Project
Default
AZ
AZ 1 (Se não estiver disponível, selecione outra AZ.)
Instance Name
kafka-dli-dws
Enterprise Project
default
Specifications
Default
Version
2.7
CPU Architecture
x86
Broker Flavor
kafka.2u4g.cluster.small (apenas para referência. Selecione o menor flavor.)
Brokers
3
VPC
Selecione uma VPC criada. Se nenhuma VPC estiver disponível, crie uma.
Security Group
Selecione um grupo de segurança criado. Se nenhum grupo de segurança estiver disponível, crie um.
Other parameters
Mantenha o valor padrão.
Figura 2 Criar uma instância do Kafka
- Clique em Buy e conclua o pagamento. Espere até que a criação seja bem sucedida.
- Na lista de instâncias do Kafka, clique no nome da instância criada do Kafka. A página Basic Information é exibida.
- Escolha Topics à esquerda e clique em Create Topic.
Defina Topic Name como topic-demo e mantenha os valores padrão para outros parâmetros.
Figura 3 Criação de um tópico
- Clique em OK. Na lista de tópicos, você pode ver que o topic-demo foi criado com sucesso.
- Escolha Consumer Groups à esquerda e clique em Create Consumer Group.
- Insira kafka01 para Consumer Group Name e clique em OK.
Passo 2: criar um cluster do GaussDB(DWS) e uma tabela de destino
- Crie um balanceador de carga dedicado, defina Network Type como IPv4 private network. Defina Region e VPC com os mesmos valores da instância do Kafka. Neste exemplo, defina Region como China-Hong Kong.
- Criação de um cluster. Para garantir a conectividade de rede, a região e a VPC do cluster de GaussDB(DWS) devem ser as mesmas da instância do Kafka. Nesta prática, a região e a VPC são China-Hong Kong.. A VPC deve ser a mesma que a criada para o Kafka.
- Na página Clusters do console do GaussDB(DWS), localize a linha que contém o cluster de destino e clique em Login na coluna Operation.
Esta prática usa a versão 8.1.3.x como exemplo. 8.1.2 e versões anteriores não suportam este modo de logon. Você pode usar o Data Studio para se conectar a um cluster. Para obter detalhes, consulte Uso do Data Studio para se conectar a um cluster.
- O nome de usuário de logon é dbadmin, o nome do banco de dados é gaussdb e a senha é a senha do usuário dbadmin definida durante a criação do cluster do armazém de dados. Selecione Remember Password, ative Collect Metadata Periodically e Show Executed SQL Statements e clique em Log In.
Figura 4 Fazer login no GaussDB(DWS)
- Clique no nome do banco de dados gaussdb e clique em SQL Window no canto superior direito para acessar o editor SQL.
- Copie a seguinte instrução SQL. Na janela SQL, clique em Execute SQL para criar a tabela de destino user_dws.
1 2 3 4 5 6
CREATE TABLE user_dws ( id int, name varchar(50), age int, PRIMARY KEY (id) );
Passo 3: criar uma fila de DLI
- Faça logon no console de gerenciamento da Huawei Cloud e escolha Analytics > Data Lake Insight na lista de serviços. O console de gerenciamento do DLI é exibido.
- No painel de navegação à esquerda, escolha Resource Management > Queue Manager.
- Clique em Buy Queue no canto superior direito, defina os seguintes parâmetros e mantenha os valores padrão para outros parâmetros que não estão descritos na tabela.
Tabela 3 Parâmetros da fila de DLI Parâmetro
Valor
Billing Mode
Pay-per-use
Region
CN-Hong Kong
Project
Default
Name
dli_dws
Type
Para uma fila geral, selecione Dedicated Resource Mode.
AZ Mode
Single-AZ deployment
Specifications
16 CUs
Enterprise Project
default
Advanced Settings
Custom
CIDR Block
172.16.0.0/18. Ele deve estar em um segmento de rede diferente do Kafka e do GaussDB(DWS). Por exemplo, se Kafka e GaussDB(DWS) estiverem no segmento de rede 192.168.x.x, selecione 172.16.x.x para DLI.
Figura 5 Criar uma fila do DLI
- Clique em Buy.
Passo 4: criar uma conexão de origem de dados avançada para Kafka e GaussDB(DWS)
- No grupo de segurança do Kafka, permita o segmento de rede onde a fila do DLI está localizada.
- Retorne ao console do Kafka e clique no nome da instância do Kafka para acessar a página Basic Information. Visualize o valor de Instance Address (Private Network) nas informações de conexão e registre o endereço para uso futuro.
Figura 6 Endereço de rede privada do Kafka
- Clique no nome do grupo de segurança.
Figura 7 Grupo de segurança do Kafka
- Escolha Inbound Rules > Add Rule, conforme mostrado na figura a seguir. Adicione o segmento de rede da fila do DLI. Neste exemplo, o segmento de rede é 172.16.0.0/18. Assegure-se de que o segmento de rede seja o mesmo que aquele entrado durante Passo 3: criar uma fila de DLI.
Figura 8 Adicionar regras ao grupo de segurança do Kafka
- Clique em OK.
- Retorne ao console do Kafka e clique no nome da instância do Kafka para acessar a página Basic Information. Visualize o valor de Instance Address (Private Network) nas informações de conexão e registre o endereço para uso futuro.
- Retorne ao console de gerenciamento do DLI, clique em Datasource Connections à esquerda, selecione Enhanced e clique em Create.
- Defina os seguintes parâmetros. Retém os valores padrão para outros parâmetros que não estão descritos na tabela.
Tabela 4 Conexão de DLI para Kafka Parâmetro
Valor
Connection Name
dli_kafka
Resource Pool
Selecione a fila do DLI criada dli_dws.
VPC
Selecione a VPC do Kafka.
Subnet
Selecione a sub-rede onde o Kafka está localizado.
Other parameters
Mantenha o valor padrão.
Figura 9 Criar uma conexão aprimorada
- Clique em OK. Aguarde até que a conexão do Kafka seja criada com êxito.
- Escolha Resources > Queue Management à esquerda e escolha More > Test Address Connectivity à direita de dli_dws.
- Na caixa endereço, digite o endereço IP privado e o número da porta da instância do Kafka obtida em 1.a. (Há três endereços de Kafka. Digite apenas um deles.)
Figura 10 Testar a conectividade do Kafka
- Clique em Test para verificar se o DLI está conectado com êxito ao Kafka.
- Faça logon no console de gerenciamento do GaussDB(DWS), escolha Clusters à esquerda e clique no nome do cluster para ir para a página de detalhes.
- Registre o nome do domínio da rede privada, o número da porta e o endereço do Elastic Load Balance do cluster do armazém de dados para uso futuro.
Figura 11 Nome de domínio privado e endereço do ELB
- Clique no nome do grupo de segurança.
Figura 12 Grupo de segurança do GaussDB(DWS)
- Escolha Inbound Rules > Add Rule, conforme mostrado na figura a seguir. Adicione o segmento de rede da fila do DLI. Neste exemplo, o segmento de rede é 172.16.0.0/18. Assegure-se de que o segmento de rede seja o mesmo que aquele entrado durante Passo 3: criar uma fila de DLI.
Figura 13 Adicionar uma regra ao grupo de segurança do GaussDB(DWS)
- Clique em OK.
- Alterne ao console do DLI, escolha Resources > Queue Management à esquerda e clique More > Test Address Connectivity à direita de dli_dws.
- Na caixa endereço, digite o endereço IP do Elastic Load Balance e o número da porta do cluster do GaussDB(DWS) obtido em 9.
Figura 14 Testar a conectividade do GaussDB(DWS)
- Clique em Test para verificar se o DLI está conectado com êxito ao GaussDB(DWS).
Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink
dws-connector-flink é uma ferramenta para interconexão com Flink baseada em APIs de JDBC do DWS. Durante a configuração do trabalho do DLI, essa ferramenta e suas dependências são armazenadas no diretório de carregamento da classe do Flink para melhorar a capacidade de importar trabalhos do Flink para GaussDB(DWS).
- Acesse https://mvnrepository.com/artifact/com.huaweicloud.dws em um navegador.
- Na lista de softwares, selecione a versão mais recente do GaussDB(DWS) Connectors Flink. Nesta prática, selecione DWS Connector Flink 2 12 1 12.
- Clique na ramificação 1.0.4. (Clique na ramificação mais recente em cenários reais).
- Clique em View All.
- Clique em dws-connector-flink_2.12_1.12-1.0.4-jar-with-dependencies.jar para fazer o download para o host local.
- Crie um bucket do OBS. Nesta prática, defina o nome do bucket como obs-flink-dws e faça upload do arquivo para o bucket do OBS. Certifique-se de que o bucket esteja na mesma região que o DLI, que nesta prática é China-Hong Kong.
Figura 15 Fazer upload do pacote JAR para o bucket do OBS
Passo 6: criar e editar um trabalho do Flink de DLI
- Retorne ao console de gerenciamento do DLI, escolha Job Management > Flink Jobs à esquerda e clique em Create Job no canto superior direito.
- Defina Type para Flink OpenSource SQL e Name para kafka-dws.
Figura 16 Criare um trabalho
- Clique em OK. A página para edição do trabalho é exibida.
- Defina os seguintes parâmetros à direita da página. Retém os valores padrão para outros parâmetros que não estão descritos na tabela.
Tabela 5 Parâmetros do trabalho do Flink Parâmetro
Valor
Queue
dli_dws
Flink Version
1.12
UDF Jar
Selecione o arquivo JAR no bucket do OBS criado em Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink.
OBS Bucket
Selecione o bucket criado em Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink.
Enable Checkpointing
Verifique a caixa.
Other parameters
Mantenha o valor padrão.
Figura 17 Editar um trabalho
- Copie o seguinte código SQL para a janela de código SQL à esquerda.
Obtenha o endereço IP privado e o número da porta da instância do Kafka de 1.a e obtenha o nome de domínio privado de 9.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
CREATE TABLE user_kafka ( id string, name string, age int ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-demo', 'properties.bootstrap.servers' ='Private IP address and port number of the Kafka instance', 'properties.group.id' = 'kafka01', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE user_dws ( id string, name string, age int, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url'='jdbc:postgresql://GaussDB(DWS) private network domain name:8000/gaussdb', 'tableName' = 'public.user_dws', 'username' = 'dbadmin', 'password' ='Password of database user dbdamin' ); insert into user_dws select * from user_kafka;
- Clique em Check Semantics e aguarde até que a verificação seja bem-sucedida.
Se a verificação falhar, verifique se a entrada SQL tem erros de sintaxe.
Figura 18 Instrução SQL de um trabalho
- Clique em Save.
- Volte para a home page do console do DLI e escolha Job Management > Flink Jobs à esquerda.
- Clique em Start à direita do nome do trabalho kafka-dws e clique em Start Now.
Aguarde cerca de 1 minuto e atualize a página. Se o status for Running, o trabalho será executado com êxito.
Figura 19 Status de execução do trabalho
Passo 7: criar e modificar mensagens no cliente do Kafka
- Crie um ECS consultando o documento do ECS. Certifique-se de que a região e o VPC do ECS sejam iguais aos do Kafka.
- Instale o JDK.
- Faça logon no ECS, vá para o diretório /usr/local e faça download do pacote JDK.
1 2
cd /usr/local wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
- Descompacte o pacote JDK baixado.
1
tar -zxvf jdk-17_linux-x64_bin.tar.gz
- Execute o seguinte comando para abrir o arquivo /etc/profile:
1
vim /etc/profile
- Pressione i para entrar no modo de edição e adicione o seguinte conteúdo ao final do arquivo /etc/profile:
1 2 3 4 5
export JAVA_HOME=/usr/local/jdk-17.0.7 #JDK installation directory export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin export PATH=$PATH:${JAVA_PATH}
- Pressione Esc e insira :wq! para salvar a configuração e sair.
- Execute o seguinte comando para que as variáveis de ambiente entrem em vigor:
1
source /etc/profile
- Execute o seguinte comando. Se as seguintes informações forem exibidas, o JDK será instalado com sucesso:
1
java -version
- Faça logon no ECS, vá para o diretório /usr/local e faça download do pacote JDK.
- Instale o cliente do Kafka.
- Vá para o diretório /opt e execute o seguinte comando para obter o pacote de software cliente do Kafka.
1 2
cd /opt wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- Descompacte o pacote de software baixado.
1
tar -zxf kafka_2.12-2.7.2.tgz
- Vá para o diretório do cliente do Kafka.
1
cd /opt/kafka_2.12-2.7.2/bin
- Vá para o diretório /opt e execute o seguinte comando para obter o pacote de software cliente do Kafka.
- Execute o seguinte comando para se conectar ao Kafka: {Connection address} indica o endereço de conexão de rede interna do Kafka. Para obter detalhes sobre como obter o endereço, consulte 1.a. topic indica o nome do tópico do Kafka criado em 6.
1
./kafka-console-producer.sh --broker-list {connection address} --topic {Topic name}
O seguinte é um exemplo:
./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo
Se > for exibido e nenhuma outra mensagem de erro for exibida, a conexão foi bem-sucedida.
- Na janela do cliente do Kafka conectado, copie o seguinte conteúdo (uma linha por vez) com base nos dados planejados em Descrição do cenário e pressione Enter para produzir mensagens:
1 2 3
{"id":"1","name":"lily","age":"16"} {"id":"2","name":"lucy","age":"17"} {"id":"3","name":"lilei","age":"15"}
- Retorne ao console do GaussDB(DWS), escolha Clusters à esquerda e clique em Log In à direita do cluster do GaussDB(DWS). A página SQL é exibida.
- Execute a seguinte instrução SQL. Você pode descobrir que os dados são salvos com sucesso no banco de dados em tempo real.
1
SELECT * FROM user_dws ORDER BY id;
- Volte para a janela do cliente para se conectar ao Kafka no ECS, copie o conteúdo a seguir (uma linha por vez) e pressione Enter para produzir mensagens.
1 2
{"id":"2","name":"jim","age":"17"} {"id":"3","name":"tom","age":"15"}
- Volte para a janela SQL aberta do GaussDB(DWS) e execute a seguinte instrução SQL. Verificou-se que os nomes cujos IDs são 2 e 3 foram alterados para jim e tom.
A descrição do cenário é como esperado. Fim dessa prática.
1
SELECT * FROM user_dws ORDER BY id;