Passo 4: conectar-se a uma instância do Kafka para criar e recuperar mensagens
A seguir, descreve-se como se conectar a uma instância do Kafka no modo de comando do cliente com SASL habilitada.
Para instâncias compradas em julho de 2020 e posteriormente, cada broker de Kafka permite no máximo 1.000 conexões de cada endereço IP por padrão. Para instâncias compradas antes de julho de 2020, cada broker de Kafka permite no máximo 200 conexões de cada endereço IP por padrão. O excesso de conexões será rejeitado. Você pode alterar o limite modificando os parâmetros do Kafka.
Pré-requisitos
- Você configurou corretamente as regras do grupo de segurança. Para mais detalhes, consulte Tabela 1.
- O endereço de conexão da instância foi obtido.
Figura 1 Endereços de conexão de instância do Kafka (SASL ativada) para acesso dentro da VPC
- Você obteve o nome do tópico criado em (Opcional) Passo 3: criar um tópico.
- Você comprou um ECS, instalou o JDK, configurou as variáveis de ambiente e fez download de um cliente de Kafka. Para obter detalhes, consulte o Passo 1: preparar o ambiente.
Preparar arquivo de configuração para criação e recuperação de mensagens
- Efetue logon em um ECS de Linux.
- Mapeie hosts para endereços IP no arquivo /etc/hosts no ECS, para que o cliente possa analisar rapidamente os agentes de instância.
Defina endereços IP para os endereços de conexão da instância obtidos em Pré-requisitos. Defina hosts para os nomes de hosts de instância. Especifique um nome exclusivo para cada host.
Por exemplo:
10.154.48.120 server01
10.154.48.121 server02
10.154.48.122 server03
- Baixe client.truststore.jks. No console do Kafka, clique na instância. Em seguida, na página de detalhes da instância, clique em Download ao lado de SSL Certificate na área Connection.
Descompacte o pacote para obter o arquivo de certificado do cliente client.truststore.jks.
- Adicione os seguintes comandos nos arquivos consumer.properties e producer.properties (PLAIN é usado como exemplo).
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
Descrição:
- username e password são especificados ao ativar SASL_SSL durante a criação da instância.
- ssl.truststore.location é o caminho para armazenar o certificado obtido em 3.
- ssl.truststore.password é certificado pelo servidor, que deve ser definido como dms@kafka e não pode ser alterado.
- ssl.endpoint.identification.algorithm decide se deve verificar o nome de domínio do certificado. Esse parâmetro deve ser deixado em branco, o que indica desabilitar a verificação de nome de domínio.
Criar mensagens
Vá para o diretório /bin do arquivo de cliente de Kafka e execute o seguinte comando:
./kafka-console-producer.sh --broker-list ${connection addr} --topic ${topic name} --producer.config ../config/producer.properties
Descrição
- {connection-address}: o endereço obtido em Pré-requisitos.
- {topic-name}: o nome do tópico criado para a instância de Kafka.
Por exemplo, 192.xxx.xxx.xxx:9093, 192.xxx.xxx.xxx:9093, 192.xxx.xxx.xxx:9093 são os endereços de conexão da instância do Kafka.
Depois de executar o comando anterior, você pode enviar uma mensagem para a instância do Kafka inserindo as informações conforme solicitado e pressionando Enter. O conteúdo em cada linha é enviado como uma mensagem.
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --producer.config ../config/producer.properties >Hello >DMS >Kafka! >^C[root@ecs-kafka bin]#
Pressione Ctrl+C para cancelar.
Recuperar mensagens
Execute o seguinte comando:
./kafka-console-consumer.sh --bootstrap-server ${connection addr} --topic ${topic name} --group ${consumer group name} --from-beginning --consumer.config ../config/consumer.properties
Descrição
- {connection-address}: o endereço obtido em Pré-requisitos.
- {topic-name}: o nome do tópico criado para a instância de Kafka.
- {consumer-group-name}: o nome do grupo de consumidores definido com base em seus requisitos de serviço. Se um nome de grupo de consumidores tiver sido especificado no arquivo de configuração, certifique-se de usar o mesmo nome na linha de comando. Caso contrário, o consumo pode falhar. Se um nome de grupo de consumidores começar com um caractere especial, como um sinal de número (#), os dados de monitoramento não poderão ser exibidos.
Exemplo:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties Hello Kafka! DMS ^CProcessed a total of 3 messages [root@ecs-kafka bin]#
Pressione Ctrl+C para cancelar.
Procedimento de acompanhamento
Você pode configurar regras de alarme para monitorar métricas para receber notificações em tempo hábil quando instâncias, brokers ou tópicos são anormais.