Interconexão do Logstash com o Kafka
Cenário
O Logstash é um pipeline de processamento de dados gratuito e aberto no lado do servidor que integra dados de várias fontes, converte-os e, em seguida, envia-os para o armazenamento especificado. Kafka é um sistema pub/sub de mensagens distribuídas de alto rendimento. É uma das fontes de entrada e saída do Logstash. A seguir descreve como interconectar o Logstash com uma instância do Kafka.
Arquitetura da solução
- A figura a seguir mostra o Kafka como uma fonte de entrada do Logstash.
Figura 1 Kafka como uma fonte de entrada do Logstash
O cliente de coleta de log envia dados para a instância do Kafka. O Logstash extrai dados da instância do Kafka com base em seu desempenho. O uso de uma instância do Kafka como a fonte de entrada do Logstash pode evitar o impacto do tráfego de intermitência no Logstash e desacoplar o cliente de coleta de logs do Logstash para garantir a estabilidade do sistema.
- A figura a seguir mostra o Kafka como uma fonte de saída do Logstash.
Figura 2 Kafka como uma fonte de saída do Logstash
O Logstash coleta dados do banco de dados e envia os dados para a instância do Kafka para armazenamento. O uso de uma instância do Kafka como fonte de saída do Logstash pode armazenar uma grande quantidade de dados graças à alta taxa de transferência do Kafka.
Restrições
O Logstash 7.5 e versões posteriores suportam o Kafka Integration Plugin, que inclui o plug-in de entrada do Kafka e o plug-in de saída do Kafka. O plug-in de entrada do Kafka lê dados de tópicos de instâncias do Kafka e o plug-in de saída do Kafka grava dados em tópicos de instâncias do Kafka. Tabela 1 lista o mapeamento de versão entre o Logstash, o Kafka Integration Plugin e os clientes do Kafka. Certifique-se de que a versão do cliente do Kafka seja posterior ou igual à versão da instância do Kafka.
Pré-requisitos
Faça a seguinte preparação antes da implementação.
- Baixe Logstash.
- Prepare um host do Windows, instale o JDK v1.8.111 ou posterior e o Git Bash no host e configure variáveis de ambiente relacionadas.
- Crie uma instância do Kafka e um tópico e obtenha as informações da instância.
Se o acesso público e a autenticação SASL estiverem desativados para a instância do Kafka, obtenha as informações listadas em Tabela 2.
Tabela 2 Informações da instância do Kafka (acesso público e autenticação SASL desativados) Parâmetro
Como obter
Instance address (private network)
Visualize-o na área Connection na página de detalhes da instância.
Topic name
No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.
O seguinte usa topic-logstash como um exemplo.
Se o acesso público estiver desativado e a autenticação SASL estiver ativada para a instância do Kafka, obtenha as informações listadas em Tabela 3.
Tabela 3 Informações da instância do Kafka (acesso público desativado e autenticação SASL ativada) Parâmetro
Como obter
Instance address (private network)
Visualize-o na área Connection na página de detalhes da instância.
SASL mechanism
Visualize-o na área Connection na página de detalhes da instância.
Security protocol
Visualize-o na área Connection na página de detalhes da instância.
Certificate
Clique em Download ao lado de SSL Certificate na área Connection na página de detalhes da instância. Faça o download e descompacte o pacote para obter o arquivo de certificado do cliente client.truststore.jks.
SASL username and password
No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Users para exibir o nome de usuário. Se você esqueceu a senha, clique em Reset Password.
Topic name
No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.
O seguinte usa topic-logstash como um exemplo.
Se o acesso público estiver ativado e a autenticação SASL estiver desativada para a instância do Kafka, obtenha as informações listadas em Tabela 4.
Tabela 4 Informações da instância do Kafka (acesso público ativado e autenticação SASL desativada) Parâmetro
Como obter
Instance address (public network)
Visualize-o na área Connection na página de detalhes da instância.
Topic name
No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.
O seguinte usa topic-logstash como um exemplo.
Se o acesso público e a autenticação SASL estiverem ativados para a instância do Kafka, obtenha as informações listadas em Tabela 5.
Tabela 5 Informações da instância do Kafka (acesso público e autenticação SASL ativados) Parâmetro
Como obter
Instance address (public network)
Visualize-o na área Connection na página de detalhes da instância.
SASL mechanism
Visualize-o na área Connection na página de detalhes da instância.
Security protocol
Visualize-o na área Connection na página de detalhes da instância.
Certificate
Clique em Download ao lado de SSL Certificate na área Connection na página de detalhes da instância. Faça o download e descompacte o pacote para obter o arquivo de certificado do cliente client.truststore.jks.
SASL username and password
No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Users para exibir o nome de usuário. Se você esqueceu a senha, clique em Reset Password.
Topic name
No console do Kafka, clique em sua instância. No painel de navegação esquerdo, escolha Topics para exibir o nome do tópico.
O seguinte usa topic-logstash como um exemplo.
Procedimento (Instância do Kafka como origem de saída do Logstash)
- No host do Windows, descompacte o pacote do Logstash, vá para a pasta config e crie o arquivo de configuração output.conf.
Figura 3 Criação do arquivo de configuração output.conf
- Adicione o seguinte conteúdo ao arquivo output.conf:
input { stdin {} } output { kafka { bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3" topic_id => "topic-logstash" # If SASL authentication is disabled, comment out the following options: # If the SASL mechanism is PLAIN, configure as follows: sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';" # If the SASL mechanism is SCRAM-SHA-512, configure as follows: sasl_mechanism => "SCRAM-SHA-512" sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';" # If the security protocol is SASL_SSL, configure as follows: security_protocol => "SASL_SSL" ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks" ssl_truststore_password => "dms@kafka" ssl_endpoint_identification_algorithm => "" # If the security protocol is SASL_PLAINTEXT, configure as follows: security_protocol => "SASL_PLAINTEXT" } }
Descrição:
- bootstrap_servers: endereço de conexão de rede privada ou endereço de conexão de rede pública da instância do Kafka.
- topics: nome do tópico.
- sasl_mechanism: mecanismo de autenticação SASL.
- sasl_jaas_config: arquivo de configuração SASL JAAS. Altere o nome de usuário e a senha de SASL conforme necessário.
- security_protocol: protocolo de segurança usado pela instância do Kafka.
- ssl.truststore.location: local onde o certificado SSL está armazenado.
- ssl_truststore_password: senha do certificado do servidor, que deve ser definida como dms@kafka e não pode ser alterada.
- ssl_endpoint_identification_algorithm: indica se o nome de domínio do certificado deve ser verificado. Se esta opção for deixada em branco, o nome de domínio do certificado não é verificado. Neste exemplo, deixe-a em branco.
Para obter mais informações sobre as opções de plug-in de saída do Kafka, consulte Plug-in de saída do Kafka.
- Abra o Git Bash no diretório root da pasta Logstash e execute o seguinte comando para iniciar o Logstash:
./bin/logstash -f ./config/output.conf
Se a mensagem "Successfully started Logstash API endpoint" for exibida, o Logstash foi iniciado.
Figura 4 Início do Logstash
- No Logstash, produza mensagens, conforme mostrado na figura a seguir.
Figura 5 Produção de mensagens
- Vá para o console do Kafka e clique em sua instância.
- No painel de navegação esquerdo, escolha Message Query.
- Selecione topic-logstash na caixa de listagem suspensa Topic Name e clique em Search para consultar as mensagens.
Como mostrado em Figura 6, o plug-in de saída do Kafka do Logstash gravou dados em topic-logstash da instância do Kafka.
Procedimento (Instância de Kafka como a origem de entrada do Logstash)
- No host do Windows, descompacte o pacote de Logstash, vá para a pasta config e crie o arquivo de configuração input.conf.
Figura 7 Criação do arquivo de configuração input.conf
- Adicione o seguinte conteúdo ao arquivo input.conf para se conectar à instância do Kafka:
input { kafka { bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3" group_id => "logstash_group" topic_id => "topic-logstash" auto_offset_reset => "earliest" # If SASL authentication is disabled, comment out the following options: #If the SASL mechanism is PLAIN, configure as follows: sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';" # If the SASL mechanism is SCRAM-SHA-512, configure as follows: sasl_mechanism => "SCRAM-SHA-512" sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';" # If the security protocol is SASL_SSL, configure as follows: security_protocol => "SASL_SSL" ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks" ssl_truststore_password => "dms@kafka" ssl_endpoint_identification_algorithm => "" # If the security protocol is SASL_PLAINTEXT, configure as follows: security_protocol => "SASL_PLAINTEXT" } } output { stdout{codec=>rubydebug} }
Descrição:
- bootstrap_servers: endereço de conexão de rede privada ou endereço de conexão de rede pública da instância do Kafka.
- group_id: nome do grupo de consumidores.
- topics: nome do tópico.
- auto_offset_reset: política de consumo dos consumidores. Este exemplo usa earliest.
- sasl_mechanism: mecanismo de autenticação SASL.
- sasl_jaas_config: arquivo de configuração SASL JAAS. Altere o nome de usuário e a senha de SASL conforme necessário.
- security_protocol: protocolo de segurança usado pela instância do Kafka.
- ssl.truststore.location: local onde o certificado SSL está armazenado.
- ssl_truststore_password: senha do certificado do servidor, que deve ser definida como dms@kafka e não pode ser alterada.
- ssl_endpoint_identification_algorithm: indica se o nome de domínio do certificado deve ser verificado. Se esta opção for deixada em branco, o nome de domínio do certificado não é verificado. Neste exemplo, deixe-a em branco.
Para obter mais informações sobre as opções de plug-in de entrada do Kafka, consulte Plug-in de entrada do Kafka.
- Abra o Git Bash no diretório root da pasta Logstash e execute o seguinte comando para iniciar o Logstash:
./bin/logstash -f ./config/input.conf
Depois que o Logstash é iniciado com sucesso, o plug-in de entrada do Kafka lê automaticamente os dados do topic-logstash da instância do Kafka, conforme mostrado na figura a seguir.
Figura 8 Logstash lendo dados de topic-logstash