连接Topic
概述
Topic创建后,您可以使用命令行、开源Kafka客户端等方式连接Topic,并向Topic生产和消费消息。
本章节主要介绍通过命令行方式连接Topic,如果您是使用的开源Kafka客户端,请参考MQS连接开发(开源客户端)。
前提条件
- 已有可用的Topic,否则请提前创建Topic。
- 根据ROMA Connect实例的Kafka版本,下载对应版本的开源Kafka命令行工具。
- 已在Kafka命令行工具的使用环境中安装Java JDK,并完成相关环境变量的配置。
- 若ROMA Connect实例启用了“MQS SASL_SSL”,还需要在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击“下载SSL证书”下载客户端证书。
- 若ROMA Connect实例的消息集成在开启SASL_SSL的同时,也开启了VPC内网明文访问,则VPC内无法使用SASL方式连接消息集成的Topic。
- 使用SASL方式连接消息集成的Topic时,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,否则会引入时延。
其中,IP地址必须为消息集成的连接地址,host为每个实例主机的名称,可以自定义,但不能重复。例如:
10.10.10.11 host01
10.10.10.12 host02
10.10.10.13 host03
- 一个消费组下的消费者在连接同一个MQS时,最多允许该消费组下的500个消费者进行连接,超过数量的消费者将连接失败。如果一个消费组下有超过500个消费者,且需要连接同一个MQS,需要把消费者拆分到多个消费组下。
已开启SASL认证
若ROMA Connect实例开启了消息集成的SASL_SSL访问,则客户端向Topic生产和消费的消息时会加密传输,安全性更高。以下操作命令以Linux系统为例进行说明。
- 解压Kafka命令行工具和客户端证书。
- 解压命令行工具文件:
tar -zxf kafka_tar
其中,kafka_tar为Kafka命令行工具压缩包的名称。
- 解压客户端证书文件:
unzip cert_zip
其中,cert_zip为客户端证书文件压缩包的名称。
- 解压命令行工具文件:
- 修改Kafka命令行工具配置文件。
在Kafka命令行工具的/config目录中找到consumer.properties和producer.properties文件,并分别在文件中增加如下内容。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN security.protocol=SASL_SSL ssl.truststore.location=/cert/client.truststore.jks ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
- 进入Kafka命令行工具的/bin目录下。
- 向Topic生产消息。
- 执行以下命令,与Topic建立生产消息的连接。
./kafka-console-producer.sh --broker-list Address --topic TopicName --producer.config ../config/producer.properties
其中:
- Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
- TopicName为要生产消息的Topic名称。
- ../config/producer.properties为配置文档所在的相对路径。
- 输入消息内容,向Topic发送消息。
>Message1 >Message2 >Message3
其中,Message1、Message2、Message3为向Topic发送的实际消息内容,一行为一条消息。
- 若要断开与Topic的连接,按“Ctrl+C”断开连接。
- 执行以下命令,与Topic建立生产消息的连接。
- 从Topic消费消息。
一个消费者从一个Topic的多个分区消费消息时,一次只能消费一个分区的消息,多个分区会分多次进行消费。
- 执行以下命令,与Topic建立消费消息的连接并读取消息。
./kafka-console-consumer.sh --bootstrap-server Address --topic TopicName --from-beginning --consumer.config ../config/consumer.properties
其中:
- Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
- TopicName为要消费消息的Topic名称。
- ../config/consumer.properties为配置文档所在的相对路径。
- 执行命令后,会持续连接Topic并读取消息。若要断开与Topic的连接,按“Ctrl+C”断开连接。
- 执行以下命令,与Topic建立消费消息的连接并读取消息。
未启用SASL认证
若ROMA Connect实例未开启消息集成的SASL_SSL访问,则客户端无需加载证书,向Topic生产和消费的消息时不会加密。以下操作命令以Linux系统为例进行说明。
- 解压Kafka命令行工具。
tar -zxf kafka_tar
其中,kafka_tar为Kafka命令行工具压缩包的名称。
- 进入Kafka命令行工具的/bin目录下。
- 向Topic生产消息。
- 执行以下命令,与Topic建立生产消息的连接。
./kafka-console-producer.sh --broker-list Address --topic TopicName
其中:
- Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
- TopicName为要生产消息的Topic名称。
- 输入消息内容,向Topic发送消息。
>Message1 >Message2 >Message3
其中,Message1、Message2、Message3为向Topic发送的实际消息内容,一行为一条消息。
- 若要断开与Topic的连接,按“Ctrl+C”断开连接。
- 执行以下命令,与Topic建立生产消息的连接。
- 从Topic消费消息。
一个消费者从一个Topic的多个分区消费消息时,一次只能消费一个分区的消息,多个分区会分多次进行消费。
- 执行以下命令,与Topic建立消费消息的连接并读取消息。
./kafka-console-consumer.sh --bootstrap-server Address --topic TopicName --from-beginning
其中:
- Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
- TopicName为要消费消息的Topic名称。
- 执行命令后,会持续连接Topic并读取消息。若要断开与Topic的连接,按“Ctrl+C”断开连接。
- 执行以下命令,与Topic建立消费消息的连接并读取消息。