Este conteúdo foi traduzido por máquina para sua conveniência e a Huawei Cloud não pode garantir que o conteúdo foi traduzido com precisão. Para exibir o conteúdo original, use o link no canto superior direito para mudar para a página em inglês.
Atualizado em 2024-09-09 GMT+08:00

Interconexão do Logstash com o Kafka

Cenário

O Logstash é um pipeline de processamento de dados gratuito e aberto no lado do servidor que integra dados de várias fontes, converte-os e, em seguida, envia-os para o armazenamento especificado. Kafka é um sistema pub/sub de mensagens distribuídas de alto rendimento. É uma das fontes de entrada e saída do Logstash. A seguir descreve como interconectar o Logstash com uma instância do Kafka.

Arquitetura da solução

  • A figura a seguir mostra o Kafka como uma fonte de entrada do Logstash.
    Figura 1 Kafka como uma fonte de entrada do Logstash

    O cliente de coleta de log envia dados para a instância do Kafka. O Logstash extrai dados da instância do Kafka com base em seu desempenho. O uso de uma instância do Kafka como a fonte de entrada do Logstash pode evitar o impacto do tráfego de intermitência no Logstash e desacoplar o cliente de coleta de logs do Logstash para garantir a estabilidade do sistema.

  • A figura a seguir mostra o Kafka como uma fonte de saída do Logstash.
    Figura 2 Kafka como uma fonte de saída do Logstash

    O Logstash coleta dados do banco de dados e envia os dados para a instância do Kafka para armazenamento. O uso de uma instância do Kafka como fonte de saída do Logstash pode armazenar uma grande quantidade de dados graças à alta taxa de transferência do Kafka.

Restrições

O Logstash 7.5 e versões posteriores suportam o Kafka Integration Plugin, que inclui o plug-in de entrada do Kafka e o plug-in de saída do Kafka. O plug-in de entrada do Kafka lê dados de tópicos de instâncias do Kafka e o plug-in de saída do Kafka grava dados em tópicos de instâncias do Kafka. Tabela 1 lista o mapeamento de versão entre o Logstash, o Kafka Integration Plugin e os clientes do Kafka. Certifique-se de que a versão do cliente do Kafka seja posterior ou igual à versão da instância do Kafka.

Tabela 1 Mapeamento de versões

Versão do Logstash

Versão do Kafka Integration Plugin

Versão do cliente do Kafka

8.3–8.8

10.12.0

2.8.1

8.0–8.2

10.9.0–10.10.0

2.5.1

7.12–7.17

10.7.4–10.9.0

2.5.1

7.8–7.11

10.2.0–10.7.1

2.4

7.6–7.7

10.0.1

2.3.0

7.5

10.0.0

2.1.0

Pré-requisitos

Faça a seguinte preparação antes da implementação.

  • Baixe Logstash.
  • Prepare um host do Windows, instale o JDK v1.8.111 ou posterior e o Git Bash no host e configure variáveis de ambiente relacionadas.
  • Crie uma instância do Kafka e um tópico e obtenha as informações da instância.

    Se o acesso público e a autenticação SASL estiverem desativados para a instância do Kafka, obtenha as informações listadas em Tabela 2.

    Tabela 2 Informações da instância do Kafka (acesso público e autenticação SASL desativados)

    Parâmetro

    Como obter

    Instance address (private network)

    Visualize-o na área Connection na página de detalhes da instância.

    Topic name

    No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.

    O seguinte usa topic-logstash como um exemplo.

    Se o acesso público estiver desativado e a autenticação SASL estiver ativada para a instância do Kafka, obtenha as informações listadas em Tabela 3.

    Tabela 3 Informações da instância do Kafka (acesso público desativado e autenticação SASL ativada)

    Parâmetro

    Como obter

    Instance address (private network)

    Visualize-o na área Connection na página de detalhes da instância.

    SASL mechanism

    Visualize-o na área Connection na página de detalhes da instância.

    Security protocol

    Visualize-o na área Connection na página de detalhes da instância.

    Certificate

    Clique em Download ao lado de SSL Certificate na área Connection na página de detalhes da instância. Faça o download e descompacte o pacote para obter o arquivo de certificado do cliente client.truststore.jks.

    SASL username and password

    No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Users para exibir o nome de usuário. Se você esqueceu a senha, clique em Reset Password.

    Topic name

    No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.

    O seguinte usa topic-logstash como um exemplo.

    Se o acesso público estiver ativado e a autenticação SASL estiver desativada para a instância do Kafka, obtenha as informações listadas em Tabela 4.

    Tabela 4 Informações da instância do Kafka (acesso público ativado e autenticação SASL desativada)

    Parâmetro

    Como obter

    Instance address (public network)

    Visualize-o na área Connection na página de detalhes da instância.

    Topic name

    No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.

    O seguinte usa topic-logstash como um exemplo.

    Se o acesso público e a autenticação SASL estiverem ativados para a instância do Kafka, obtenha as informações listadas em Tabela 5.

    Tabela 5 Informações da instância do Kafka (acesso público e autenticação SASL ativados)

    Parâmetro

    Como obter

    Instance address (public network)

    Visualize-o na área Connection na página de detalhes da instância.

    SASL mechanism

    Visualize-o na área Connection na página de detalhes da instância.

    Security protocol

    Visualize-o na área Connection na página de detalhes da instância.

    Certificate

    Clique em Download ao lado de SSL Certificate na área Connection na página de detalhes da instância. Faça o download e descompacte o pacote para obter o arquivo de certificado do cliente client.truststore.jks.

    SASL username and password

    No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Users para exibir o nome de usuário. Se você esqueceu a senha, clique em Reset Password.

    Topic name

    No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.

    O seguinte usa topic-logstash como um exemplo.

Procedimento (Instância do Kafka como origem de saída do Logstash)

  1. No host do Windows, descompacte o pacote do Logstash, vá para a pasta config e crie o arquivo de configuração output.conf.

    Figura 3 Criação do arquivo de configuração output.conf

  2. Adicione o seguinte conteúdo ao arquivo output.conf:

    input {
        stdin {}
    }
    output {
     kafka {
    	 bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3"
    	 topic_id => "topic-logstash"
    	
    	# If SASL authentication is disabled, comment out the following options:
    	 # If the SASL mechanism is PLAIN, configure as follows:
    	 sasl_mechanism => "PLAIN"
    	 sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';"
    	
    	 # If the SASL mechanism is SCRAM-SHA-512, configure as follows:
    	 sasl_mechanism => "SCRAM-SHA-512"
    	 sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';"
    		
    	 # If the security protocol is SASL_SSL, configure as follows:
    	 security_protocol => "SASL_SSL"
    	 ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks"
    	 ssl_truststore_password => "dms@kafka"
    	 ssl_endpoint_identification_algorithm => ""
    	
    	# If the security protocol is SASL_PLAINTEXT, configure as follows:
    	 security_protocol => "SASL_PLAINTEXT"
    	 }
    }

    Descrição:

    • bootstrap_servers: endereço de conexão de rede privada ou endereço de conexão de rede pública da instância do Kafka.
    • topics: nome do tópico.
    • sasl_mechanism: mecanismo de autenticação SASL.
    • sasl_jaas_config: arquivo de configuração SASL JAAS. Altere o nome de usuário e a senha de SASL conforme necessário.
    • security_protocol: protocolo de segurança usado pela instância do Kafka.
    • ssl.truststore.location: local onde o certificado SSL está armazenado.
    • ssl_truststore_password: senha do certificado do servidor, que deve ser definida como dms@kafka e não pode ser alterada.
    • ssl_endpoint_identification_algorithm: indica se o nome de domínio do certificado deve ser verificado. Se esta opção for deixada em branco, o nome de domínio do certificado não é verificado. Neste exemplo, deixe-a em branco.

    Para obter mais informações sobre as opções de plug-in de saída do Kafka, consulte Plug-in de saída do Kafka.

  3. Abra o Git Bash no diretório root da pasta Logstash e execute o seguinte comando para iniciar o Logstash:

    ./bin/logstash -f ./config/output.conf

    Se a mensagem "Successfully started Logstash API endpoint" for exibida, o Logstash foi iniciado.

    Figura 4 Início do Logstash

  4. No Logstash, produza mensagens, conforme mostrado na figura a seguir.

    Figura 5 Produção de mensagens

  5. Vá para o console do Kafka e clique em sua instância.
  6. No painel de navegação esquerdo, escolha Message Query.
  7. Selecione topic-logstash na caixa de listagem suspensa Topic Name e clique em Search para consultar as mensagens.

    Figura 6 Consulta das mensagens

    Como mostrado em Figura 6, o plug-in de saída do Kafka do Logstash gravou dados em topic-logstash da instância do Kafka.

Procedimento (Instância de Kafka como a origem de entrada do Logstash)

  1. No host do Windows, descompacte o pacote de Logstash, vá para a pasta config e crie o arquivo de configuração input.conf.

    Figura 7 Criação do arquivo de configuração input.conf

  2. Adicione o seguinte conteúdo ao arquivo input.conf para se conectar à instância do Kafka:

    input {
     kafka {
    	 bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3"
    	 group_id => "logstash_group"
    	 topic_id => "topic-logstash"
    	 auto_offset_reset => "earliest"
    	
    	# If SASL authentication is disabled, comment out the following options:
    	 #If the SASL mechanism is PLAIN, configure as follows:
    	 sasl_mechanism => "PLAIN"
    	 sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';"
    	
    	 # If the SASL mechanism is SCRAM-SHA-512, configure as follows:
    	 sasl_mechanism => "SCRAM-SHA-512"
    	 sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';"
    		
    	 # If the security protocol is SASL_SSL, configure as follows:
    	 security_protocol => "SASL_SSL"
    	 ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks"
    	 ssl_truststore_password => "dms@kafka"
    	 ssl_endpoint_identification_algorithm => ""
    	
    	# If the security protocol is SASL_PLAINTEXT, configure as follows:
    	 security_protocol => "SASL_PLAINTEXT"
    	 }
    }
    output {
     stdout{codec=>rubydebug}
    }

    Descrição:

    • bootstrap_servers: endereço de conexão de rede privada ou endereço de conexão de rede pública da instância do Kafka.
    • group_id: nome do grupo de consumidores.
    • topics: nome do tópico.
    • auto_offset_reset: política de consumo dos consumidores. Este exemplo usa earliest.
    • sasl_mechanism: mecanismo de autenticação SASL.
    • sasl_jaas_config: arquivo de configuração SASL JAAS. Altere o nome de usuário e a senha de SASL conforme necessário.
    • security_protocol: protocolo de segurança usado pela instância do Kafka.
    • ssl.truststore.location: local onde o certificado SSL está armazenado.
    • ssl_truststore_password: senha do certificado do servidor, que deve ser definida como dms@kafka e não pode ser alterada.
    • ssl_endpoint_identification_algorithm: indica se o nome de domínio do certificado deve ser verificado. Se esta opção for deixada em branco, o nome de domínio do certificado não é verificado. Neste exemplo, deixe-a em branco.

    Para obter mais informações sobre as opções de plug-in de entrada do Kafka, consulte Plug-in de entrada do Kafka.

  3. Abra o Git Bash no diretório root da pasta Logstash e execute o seguinte comando para iniciar o Logstash:

    ./bin/logstash -f ./config/input.conf

    Depois que o Logstash é iniciado com sucesso, o plug-in de entrada do Kafka lê automaticamente os dados do topic-logstash da instância do Kafka, conforme mostrado na figura a seguir.

    Figura 8 Logstash lendo dados de topic-logstash