Este conteúdo foi traduzido por máquina para sua conveniência e a Huawei Cloud não pode garantir que o conteúdo foi traduzido com precisão. Para exibir o conteúdo original, use o link no canto superior direito para mudar para a página em inglês.
Computação
Elastic Cloud Server
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Redes
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Gerenciamento e governança
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
Cloud Operations Center
Resource Governance Center
Migração
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Análises
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
IoT
IoT Device Access
Outros
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Segurança e conformidade
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Situation Awareness
Managed Threat Detection
Blockchain
Blockchain Service
Serviços de mídia
Media Processing Center
Video On Demand
Live
SparkRTC
Armazenamento
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Cloud Server Backup Service
Storage Disaster Recovery Service
Scalable File Service
Volume Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Bancos de dados
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Aplicações de negócios
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Data Lake Factory
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Distribuição de conteúdo e computação de borda
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Soluções
SAP Cloud
High Performance Computing
Serviços para desenvolvedore
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
Cloud Application Engine
MacroVerse aPaaS
KooPhone
KooDrive
Central de ajuda/ GaussDB(DWS)/ Melhores práticas/ Migração de dados/ Uso de trabalhos de Flink de DLI para gravar dados do Kafka para GaussDB(DWS) em tempo real

Uso de trabalhos de Flink de DLI para gravar dados do Kafka para GaussDB(DWS) em tempo real

Atualizado em 2024-05-09 GMT+08:00

Esta prática demonstra como usar trabalhos do Flink de DLI para sincronizar dados de consumo do Kafka para o GaussDB(DWS) em tempo real. O processo de demonstração inclui gravar e atualizar os dados existentes em tempo real.

Figura 1 Importação de dados do Kafka para o GaussDB(DWS) em tempo real

Essa prática leva cerca de 90 minutos. Os serviços de nuvem usados nessa prática incluem Virtual Private Cloud (VPC) e sub-redes, Elastic Load Balance (ELB), Elastic Cloud Server (ECS), Object Storage Service (OBS), Distributed Message Service (DMS) for Kafka, Data Lake Insight (DLI) e Data Warehouse Service (DWS). O processo básico é o seguinte:

  1. Preparações
  2. Passo 1: criar uma instância do Kafka
  3. Passo 2: criar um cluster do GaussDB(DWS) e uma tabela de destino
  4. Passo 3: criar uma fila de DLI
  5. Passo 4: criar uma conexão de origem de dados avançada para Kafka e GaussDB(DWS)
  6. Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink
  7. Passo 6: criar e editar um trabalho do Flink de DLI
  8. Passo 7: criar e modificar mensagens no cliente do Kafka

Descrição do cenário

Suponha que os dados de exemplo da fonte de dados de Kafka é uma tabela de informações do usuário, como mostrado em Tabela 1, que contém os campos id, name e age. O campo id é único e fixo, que é compartilhado por vários sistemas de serviço. Geralmente, o campo id não precisa ser modificado. Somente os campos name e age precisam ser modificados.

Use o Kafka para gerar os três grupos de dados a seguir e use os trabalhos de Flink de DLI para sincronizar os dados com o GaussDB(DWS): Altere os usuários cujos IDs são 2 e 3 para jim e tom e use os trabalhos de Flink de DLI para atualizar dados e sincronizar os dados com o GaussDB(DWS).

Tabela 1 Dados de amostra

id

name

age

1

lily

16

2

lucy > jim

17

3

lilei > tom

15

Restrições

  • Certifique-se de que VPC, ECS, OBS, Kafka, DLI e GaussDB(DWS) estejam na mesma região, por exemplo, China-Hong Kong.
  • Certifique-se de que Kafka, DLI e GaussDB(DWS) possam se comunicar uns com os outros. Nesta prática, Kafka e GaussDB(DWS) são criados na mesma região e VPC, e os grupos de segurança de Kafka e GaussDB(DWS) permitem o segmento de rede das filas de DLI.
  • Para garantir que a ligação entre DLI e DWS é estável, vincule o serviço ELB para o cluster de armazém de dados criado.

Preparativos

  • Você registrou uma conta da Huawei e ativou os serviços da Huawei Cloud.. Antes de usar o GaussDB(DWS), verifique o status da conta. A conta não pode estar em atraso ou congelada.
  • Você criou uma VPC e uma sub-rede. Para obter detalhes, consulte Criação de uma VPC.

Passo 1: criar uma instância do Kafka

  1. Faça logon no console de gerenciamento do Huawei Cloud e escolha Middleware > Distributed Message Service (for Kafka) na lista de serviços. O console de gerenciamento do Kafka é exibido.
  2. Clique em DMS for Kafka à esquerda e clique em Buy Instance no canto superior direito.
  3. Defina os seguintes parâmetros. Retém os valores padrão para outros parâmetros que não estão descritos na tabela.

    Tabela 2 Parâmetros de instância do Kafka

    Parâmetro

    Valor

    Billing Mode

    Pay-per-use

    Region

    CN-Hong Kong

    Project

    Default

    AZ

    AZ 1 (Se não estiver disponível, selecione outra AZ.)

    Instance Name

    kafka-dli-dws

    Enterprise Project

    default

    Specifications

    Default

    Version

    2.7

    CPU Architecture

    x86

    Broker Flavor

    kafka.2u4g.cluster.small (apenas para referência. Selecione o menor flavor.)

    Brokers

    3

    VPC

    Selecione uma VPC criada. Se nenhuma VPC estiver disponível, crie uma.

    Security Group

    Selecione um grupo de segurança criado. Se nenhum grupo de segurança estiver disponível, crie um.

    Other parameters

    Mantenha o valor padrão.

    Figura 2 Criar uma instância do Kafka

  4. Clique em Buy e conclua o pagamento. Espere até que a criação seja bem sucedida.
  5. Na lista de instâncias do Kafka, clique no nome da instância criada do Kafka. A página Basic Information é exibida.
  6. Escolha Topics à esquerda e clique em Create Topic.

    Defina Topic Name como topic-demo e mantenha os valores padrão para outros parâmetros.

    Figura 3 Criação de um tópico

  7. Clique em OK. Na lista de tópicos, você pode ver que o topic-demo foi criado com sucesso.
  8. Escolha Consumer Groups à esquerda e clique em Create Consumer Group.
  9. Insira kafka01 para Consumer Group Name e clique em OK.

Passo 2: criar um cluster do GaussDB(DWS) e uma tabela de destino

  1. Crie um balanceador de carga dedicado, defina Network Type como IPv4 private network. Defina Region e VPC com os mesmos valores da instância do Kafka. Neste exemplo, defina Region como China-Hong Kong.
  2. Criação de um cluster. Para garantir a conectividade de rede, a região e a VPC do cluster de GaussDB(DWS) devem ser as mesmas da instância do Kafka. Nesta prática, a região e a VPC são China-Hong Kong.. A VPC deve ser a mesma que a criada para o Kafka.
  3. Na página Clusters do console do GaussDB(DWS), localize a linha que contém o cluster de destino e clique em Login na coluna Operation.

    Esta prática usa a versão 8.1.3.x como exemplo. 8.1.2 e versões anteriores não suportam este modo de logon. Você pode usar o Data Studio para se conectar a um cluster. Para obter detalhes, consulte Uso do Data Studio para se conectar a um cluster.

  4. O nome de usuário de logon é dbadmin, o nome do banco de dados é gaussdb e a senha é a senha do usuário dbadmin definida durante a criação do cluster do armazém de dados. Selecione Remember Password, ative Collect Metadata Periodically e Show Executed SQL Statements e clique em Log In.

    Figura 4 Fazer login no GaussDB(DWS)

  5. Clique no nome do banco de dados gaussdb e clique em SQL Window no canto superior direito para acessar o editor SQL.
  6. Copie a seguinte instrução SQL. Na janela SQL, clique em Execute SQL para criar a tabela de destino user_dws.

    1
    2
    3
    4
    5
    6
    CREATE TABLE user_dws (
    id int,
    name varchar(50),
    age int,
    PRIMARY KEY (id)
    );
    

Passo 3: criar uma fila de DLI

  1. Faça logon no console de gerenciamento da Huawei Cloud e escolha Analytics > Data Lake Insight na lista de serviços. O console de gerenciamento do DLI é exibido.
  2. No painel de navegação à esquerda, escolha Resource Management > Queue Manager.
  3. Clique em Buy Queue no canto superior direito, defina os seguintes parâmetros e mantenha os valores padrão para outros parâmetros que não estão descritos na tabela.

    Tabela 3 Parâmetros da fila de DLI

    Parâmetro

    Valor

    Billing Mode

    Pay-per-use

    Region

    CN-Hong Kong

    Project

    Default

    Name

    dli_dws

    Type

    Para uma fila geral, selecione Dedicated Resource Mode.

    AZ Mode

    Single-AZ deployment

    Specifications

    16 CUs

    Enterprise Project

    default

    Advanced Settings

    Custom

    CIDR Block

    172.16.0.0/18. Ele deve estar em um segmento de rede diferente do Kafka e do GaussDB(DWS). Por exemplo, se Kafka e GaussDB(DWS) estiverem no segmento de rede 192.168.x.x, selecione 172.16.x.x para DLI.

    Figura 5 Criar uma fila do DLI

  4. Clique em Buy.

Passo 4: criar uma conexão de origem de dados avançada para Kafka e GaussDB(DWS)

  1. No grupo de segurança do Kafka, permita o segmento de rede onde a fila do DLI está localizada.

    1. Retorne ao console do Kafka e clique no nome da instância do Kafka para acessar a página Basic Information. Visualize o valor de Instance Address (Private Network) nas informações de conexão e registre o endereço para uso futuro.
      Figura 6 Endereço de rede privada do Kafka
    2. Clique no nome do grupo de segurança.
      Figura 7 Grupo de segurança do Kafka
    3. Escolha Inbound Rules > Add Rule, conforme mostrado na figura a seguir. Adicione o segmento de rede da fila do DLI. Neste exemplo, o segmento de rede é 172.16.0.0/18. Assegure-se de que o segmento de rede seja o mesmo que aquele entrado durante Passo 3: criar uma fila de DLI.
      Figura 8 Adicionar regras ao grupo de segurança do Kafka
    4. Clique em OK.

  2. Retorne ao console de gerenciamento do DLI, clique em Datasource Connections à esquerda, selecione Enhanced e clique em Create.
  3. Defina os seguintes parâmetros. Retém os valores padrão para outros parâmetros que não estão descritos na tabela.

    Tabela 4 Conexão de DLI para Kafka

    Parâmetro

    Valor

    Connection Name

    dli_kafka

    Resource Pool

    Selecione a fila do DLI criada dli_dws.

    VPC

    Selecione a VPC do Kafka.

    Subnet

    Selecione a sub-rede onde o Kafka está localizado.

    Other parameters

    Mantenha o valor padrão.

    Figura 9 Criar uma conexão aprimorada

  4. Clique em OK. Aguarde até que a conexão do Kafka seja criada com êxito.
  5. Escolha Resources > Queue Management à esquerda e escolha More > Test Address Connectivity à direita de dli_dws.
  6. Na caixa endereço, digite o endereço IP privado e o número da porta da instância do Kafka obtida em 1.a. (Há três endereços de Kafka. Digite apenas um deles.)

    Figura 10 Testar a conectividade do Kafka

  7. Clique em Test para verificar se o DLI está conectado com êxito ao Kafka.
  8. Faça logon no console de gerenciamento do GaussDB(DWS), escolha Clusters à esquerda e clique no nome do cluster para ir para a página de detalhes.
  9. Registre o nome do domínio da rede privada, o número da porta e o endereço do Elastic Load Balance do cluster do armazém de dados para uso futuro.

    Figura 11 Nome de domínio privado e endereço do ELB

  10. Clique no nome do grupo de segurança.

    Figura 12 Grupo de segurança do GaussDB(DWS)

  11. Escolha Inbound Rules > Add Rule, conforme mostrado na figura a seguir. Adicione o segmento de rede da fila do DLI. Neste exemplo, o segmento de rede é 172.16.0.0/18. Assegure-se de que o segmento de rede seja o mesmo que aquele entrado durante Passo 3: criar uma fila de DLI.

    Figura 13 Adicionar uma regra ao grupo de segurança do GaussDB(DWS)

  12. Clique em OK.
  13. Alterne ao console do DLI, escolha Resources > Queue Management à esquerda e clique More > Test Address Connectivity à direita de dli_dws.
  14. Na caixa endereço, digite o endereço IP do Elastic Load Balance e o número da porta do cluster do GaussDB(DWS) obtido em 9.

    Figura 14 Testar a conectividade do GaussDB(DWS)

  15. Clique em Test para verificar se o DLI está conectado com êxito ao GaussDB(DWS).

Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink

dws-connector-flink é uma ferramenta para interconexão com Flink baseada em APIs de JDBC do DWS. Durante a configuração do trabalho do DLI, essa ferramenta e suas dependências são armazenadas no diretório de carregamento da classe do Flink para melhorar a capacidade de importar trabalhos do Flink para GaussDB(DWS).

  1. Acesse https://mvnrepository.com/artifact/com.huaweicloud.dws em um navegador.
  2. Na lista de softwares, selecione a versão mais recente do GaussDB(DWS) Connectors Flink. Nesta prática, selecione DWS Connector Flink 2 12 1 12.

  3. Clique na ramificação 1.0.4. (Clique na ramificação mais recente em cenários reais).

  4. Clique em View All.

  5. Clique em dws-connector-flink_2.12_1.12-1.0.4-jar-with-dependencies.jar para fazer o download para o host local.

  6. Crie um bucket do OBS. Nesta prática, defina o nome do bucket como obs-flink-dws e faça upload do arquivo para o bucket do OBS. Certifique-se de que o bucket esteja na mesma região que o DLI, que nesta prática é China-Hong Kong.

    Figura 15 Fazer upload do pacote JAR para o bucket do OBS

Passo 6: criar e editar um trabalho do Flink de DLI

  1. Retorne ao console de gerenciamento do DLI, escolha Job Management > Flink Jobs à esquerda e clique em Create Job no canto superior direito.
  2. Defina Type para Flink OpenSource SQL e Name para kafka-dws.

    Figura 16 Criare um trabalho

  3. Clique em OK. A página para edição do trabalho é exibida.
  4. Defina os seguintes parâmetros à direita da página. Retém os valores padrão para outros parâmetros que não estão descritos na tabela.

    Tabela 5 Parâmetros do trabalho do Flink

    Parâmetro

    Valor

    Queue

    dli_dws

    Flink Version

    1.12

    UDF Jar

    Selecione o arquivo JAR no bucket do OBS criado em Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink.

    OBS Bucket

    Selecione o bucket criado em Passo 5: preparar a ferramenta dws-connector-flink para interconectar o GaussDB(DWS) com o Flink.

    Enable Checkpointing

    Verifique a caixa.

    Other parameters

    Mantenha o valor padrão.

    Figura 17 Editar um trabalho

  5. Copie o seguinte código SQL para a janela de código SQL à esquerda.

    Obtenha o endereço IP privado e o número da porta da instância do Kafka de 1.a e obtenha o nome de domínio privado de 9.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    CREATE TABLE user_kafka (
      id string,
      name string,
      age int
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'topic-demo',
    'properties.bootstrap.servers' ='Private IP address and port number of the Kafka instance',
      'properties.group.id' = 'kafka01',
      'scan.startup.mode' = 'latest-offset',
      "format" = "json"
    );
    
    CREATE TABLE user_dws (
      id string,
      name string,
      age int,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'dws',
    'url'='jdbc:postgresql://GaussDB(DWS) private network domain name:8000/gaussdb',
      'tableName' = 'public.user_dws',
      'username' = 'dbadmin',
    'password' ='Password of database user dbdamin'
    );
    
    insert into user_dws select * from user_kafka;
    

  6. Clique em Check Semantics e aguarde até que a verificação seja bem-sucedida.

    Se a verificação falhar, verifique se a entrada SQL tem erros de sintaxe.

    Figura 18 Instrução SQL de um trabalho

  7. Clique em Save.
  8. Volte para a home page do console do DLI e escolha Job Management > Flink Jobs à esquerda.
  9. Clique em Start à direita do nome do trabalho kafka-dws e clique em Start Now.

    Aguarde cerca de 1 minuto e atualize a página. Se o status for Running, o trabalho será executado com êxito.

    Figura 19 Status de execução do trabalho

Passo 7: criar e modificar mensagens no cliente do Kafka

  1. Crie um ECS consultando o documento do ECS. Certifique-se de que a região e o VPC do ECS sejam iguais aos do Kafka.
  2. Instale o JDK.

    1. Faça logon no ECS, vá para o diretório /usr/local e faça download do pacote JDK.
      1
      2
      cd /usr/local
      wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
      
    2. Descompacte o pacote JDK baixado.
      1
      tar -zxvf jdk-17_linux-x64_bin.tar.gz
      
    3. Execute o seguinte comando para abrir o arquivo /etc/profile:
      1
      vim /etc/profile
      
    4. Pressione i para entrar no modo de edição e adicione o seguinte conteúdo ao final do arquivo /etc/profile:
      1
      2
      3
      4
      5
      export JAVA_HOME=/usr/local/jdk-17.0.7 #JDK installation directory
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH 
      export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
      export PATH=$PATH:${JAVA_PATH}
      

    5. Pressione Esc e insira :wq! para salvar a configuração e sair.
    6. Execute o seguinte comando para que as variáveis de ambiente entrem em vigor:
      1
      source /etc/profile
      
    7. Execute o seguinte comando. Se as seguintes informações forem exibidas, o JDK será instalado com sucesso:
      1
      java -version
      

  3. Instale o cliente do Kafka.

    1. Vá para o diretório /opt e execute o seguinte comando para obter o pacote de software cliente do Kafka.
      1
      2
      cd /opt
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
      
    2. Descompacte o pacote de software baixado.
      1
      tar -zxf kafka_2.12-2.7.2.tgz
      
    3. Vá para o diretório do cliente do Kafka.
      1
      cd /opt/kafka_2.12-2.7.2/bin
      

  4. Execute o seguinte comando para se conectar ao Kafka: {Connection address} indica o endereço de conexão de rede interna do Kafka. Para obter detalhes sobre como obter o endereço, consulte 1.a. topic indica o nome do tópico do Kafka criado em 6.

    1
    ./kafka-console-producer.sh --broker-list {connection address} --topic {Topic name}
    

    O seguinte é um exemplo:

    ./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo

    Se > for exibido e nenhuma outra mensagem de erro for exibida, a conexão foi bem-sucedida.

  5. Na janela do cliente do Kafka conectado, copie o seguinte conteúdo (uma linha por vez) com base nos dados planejados em Descrição do cenário e pressione Enter para produzir mensagens:

    1
    2
    3
    {"id":"1","name":"lily","age":"16"}
    {"id":"2","name":"lucy","age":"17"}
    {"id":"3","name":"lilei","age":"15"}
    

  6. Retorne ao console do GaussDB(DWS), escolha Clusters à esquerda e clique em Log In à direita do cluster do GaussDB(DWS). A página SQL é exibida.
  7. Execute a seguinte instrução SQL. Você pode descobrir que os dados são salvos com sucesso no banco de dados em tempo real.

    1
    SELECT * FROM user_dws ORDER BY id;
    

  1. Volte para a janela do cliente para se conectar ao Kafka no ECS, copie o conteúdo a seguir (uma linha por vez) e pressione Enter para produzir mensagens.

    1
    2
    {"id":"2","name":"jim","age":"17"}
    {"id":"3","name":"tom","age":"15"}
    

  2. Volte para a janela SQL aberta do GaussDB(DWS) e execute a seguinte instrução SQL. Verificou-se que os nomes cujos IDs são 2 e 3 foram alterados para jim e tom.

    A descrição do cenário é como esperado. Fim dessa prática.
    1
    SELECT * FROM user_dws ORDER BY id;
    

Usamos cookies para aprimorar nosso site e sua experiência. Ao continuar a navegar em nosso site, você aceita nossa política de cookies. Saiba mais

Feedback

Feedback

Feedback

0/500

Conteúdo selecionado

Envie o conteúdo selecionado com o feedback