更新时间:2023-09-21 GMT+08:00

连接Topic

概述

Topic创建后,您可以使用命令行、开源Kafka客户端等方式连接Topic,并向Topic生产和消费消息。

本章节主要介绍通过命令行方式连接Topic,如果您是使用的开源Kafka客户端,请参考MQS连接开发(开源客户端)

前提条件

  • 已有可用的Topic,否则请提前创建Topic
  • 根据ROMA Connect实例的Kafka版本,下载对应版本的开源Kafka命令行工具。
    您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。
  • 已在Kafka命令行工具的使用环境中安装Java JDK,并完成相关环境变量的配置。
  • 若ROMA Connect实例启用了“MQS SASL_SSL”,还需要在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击“下载SSL证书”下载客户端证书
  • 若ROMA Connect实例的消息集成在开启SASL_SSL的同时,也开启了VPC内网明文访问,则VPC内无法使用SASL方式连接消息集成的Topic。
  • 使用SASL方式连接消息集成的Topic时,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,否则会引入时延。

    其中,IP地址必须为消息集成的连接地址,host为每个实例主机的名称,可以自定义,但不能重复。例如:

    10.10.10.11 host01

    10.10.10.12 host02

    10.10.10.13 host03

  • 一个消费组下的消费者在连接同一个MQS时,最多允许该消费组下的500个消费者进行连接,超过数量的消费者将连接失败。如果一个消费组下有超过500个消费者,且需要连接同一个MQS,需要把消费者拆分到多个消费组下。

已开启SASL认证

若ROMA Connect实例开启了消息集成的SASL_SSL访问,则客户端向Topic生产和消费的消息时会加密传输,安全性更高。以下操作命令以Linux系统为例进行说明。

  1. 解压Kafka命令行工具和客户端证书。

    进入文件压缩包所在的目录,然后执行以下命令解压文件。

    • 解压命令行工具文件:
      tar -zxf kafka_tar

      其中,kafka_tar为Kafka命令行工具压缩包的名称。

    • 解压客户端证书文件:
      unzip cert_zip

      其中,cert_zip为客户端证书文件压缩包的名称。

  2. 修改Kafka命令行工具配置文件。

    在Kafka命令行工具的/config目录中找到consumer.propertiesproducer.properties文件,并分别在文件中增加如下内容。

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="**********" \
    password="**********";
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    ssl.truststore.location=/cert/client.truststore.jks
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=
    • usernamepassword的值分别为Topic所属集成应用的Key和Secret。您可以参考查看和编辑集成应用获取Key和Secret。
    • ssl.truststore.location的值为1中解压得到的客户端证书的存放路径,请根据实际情况填写。注意,Windows系统下证书路径中必须使用“/”。
    • ssl.truststore.password为服务器证书密码,不可更改,值固定设置为dms@kafka。
  3. 进入Kafka命令行工具的/bin目录下。

    注意,Windows系统下需要进入/bin/windows目录下。

  4. 向Topic生产消息。
    1. 执行以下命令,与Topic建立生产消息的连接。
      ./kafka-console-producer.sh --broker-list Address --topic TopicName --producer.config ../config/producer.properties

      其中:

      • Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
      • TopicName为要生产消息的Topic名称。
      • ../config/producer.properties为配置文档所在的相对路径。
    2. 输入消息内容,向Topic发送消息。
      >Message1
      >Message2
      >Message3

      其中,Message1Message2Message3为向Topic发送的实际消息内容,一行为一条消息。

    3. 若要断开与Topic的连接,按“Ctrl+C”断开连接。
  5. 从Topic消费消息。

    一个消费者从一个Topic的多个分区消费消息时,一次只能消费一个分区的消息,多个分区会分多次进行消费。

    1. 执行以下命令,与Topic建立消费消息的连接并读取消息。
      ./kafka-console-consumer.sh --bootstrap-server Address --topic TopicName --from-beginning --consumer.config ../config/consumer.properties

      其中:

      • Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
      • TopicName为要消费消息的Topic名称。
      • ../config/consumer.properties为配置文档所在的相对路径。
    2. 执行命令后,会持续连接Topic并读取消息。若要断开与Topic的连接,按“Ctrl+C”断开连接。

未启用SASL认证

若ROMA Connect实例未开启消息集成的SASL_SSL访问,则客户端无需加载证书,向Topic生产和消费的消息时不会加密。以下操作命令以Linux系统为例进行说明。

  1. 解压Kafka命令行工具。

    进入文件压缩包所在的目录,然后执行以下命令解压文件。

    tar -zxf kafka_tar

    其中,kafka_tar为Kafka命令行工具压缩包的名称。

  2. 进入Kafka命令行工具的/bin目录下。

    注意,Windows系统下需要进入/bin/windows目录下。

  3. 向Topic生产消息。
    1. 执行以下命令,与Topic建立生产消息的连接。
      ./kafka-console-producer.sh --broker-list Address --topic TopicName

      其中:

      • Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
      • TopicName为要生产消息的Topic名称。
    2. 输入消息内容,向Topic发送消息。
      >Message1
      >Message2
      >Message3

      其中,Message1Message2Message3为向Topic发送的实际消息内容,一行为一条消息。

    3. 若要断开与Topic的连接,按“Ctrl+C”断开连接。
  4. 从Topic消费消息。

    一个消费者从一个Topic的多个分区消费消息时,一次只能消费一个分区的消息,多个分区会分多次进行消费。

    1. 执行以下命令,与Topic建立消费消息的连接并读取消息。
      ./kafka-console-consumer.sh --bootstrap-server Address --topic TopicName --from-beginning

      其中:

      • Address为ROMA Connect的消息集成连接地址,您可以参考查看实例信息获取消息集成MQS连接地址。如果您是公网访问,则使用公网连接地址;如果是VPC内访问,则使用内网连接地址。
      • TopicName为要消费消息的Topic名称。
    2. 执行命令后,会持续连接Topic并读取消息。若要断开与Topic的连接,按“Ctrl+C”断开连接。