更新时间:2024-07-10 GMT+08:00

使用ROMA Connect集成消息

概述

ROMA Connect提供了安全、标准化消息通道,实现不同消息系统的集成对接。

本章节通过完成一个使用Kafka命令行与ROMA Connect对接的配置样例,帮助您快速熟悉使用ROMA Connect集成消息的过程。

使用ROMA Connect集成消息的步骤如下所示:

准备工作

在开始操作前,您需要提前完成以下准备工作。

  • ROMA Connect实例已绑定弹性公网IP。
  • 已在本地PC下载并安装Java JDK,并完成相关环境变量的配置。
  • 根据ROMA Connect实例的Kafka版本,下载对应版本的开源Kafka命令行工具。您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。

    ROMA Connect的Kafka服务端版本为1.1.0或2.3.0版本,请使用与服务端相同版本的Kafka命令行工具,避免出现不可预知的问题。

  • 若ROMA Connect实例启用了“MQS SASL_SSL”,需要在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击“下载SSL证书”下载。

步骤一:创建消息Topic

Topic是消息客户端与ROMA Connect间进行消息传输的通道,客户端通过Topic向ROMA Connect收发消息。

  1. 创建集成应用。
    1. 登录ROMA Connect控制台,在“实例”页面单击实例上的“查看控制台”,进入实例控制台。
    2. 在左侧的导航栏选择“集成应用”,单击页面右上角的“创建集成应用”。
    3. 在创建集成应用弹窗中填写集成应用的“名称”,然后单击“确认”。
  2. 创建消息Topic。
    1. 在左侧的导航栏选择“消息集成 MQS > Topic管理”,单击页面右上角的“创建Topic”。
    2. 在创建Topic弹窗中填写Topic相关配置信息,然后单击“确定”,创建Topic。
      图1 创建Topic
      表1 Topic配置

      参数

      配置说明

      Topic名称

      填写Topic的名称,根据规划自定义。建议您按照一定的命名规则填写Topic名称,方便您快速识别和查找。

      集成应用

      选择1中创建的集成应用。

      权限

      为Topic所属的集成应用选择对Topic的操作权限,此处选择“发布+订阅”,即该Topic可用于生产和消费消息。

      分区数

      合理设置分区数量,可以提升消息生产与消费的并发性能。为简单起见,此处使用默认值“3”。

      副本数

      ROMA Connect会自动在每个副本上备份数据,当其中一个副本故障时数据依然可用,Topic的副本数越多,可靠性越高。为简单起见,此处使用默认值“3”。

      老化时间(小时)

      超过老化时间后,Topic中存储的消息将会被删除。为简单起见,此处使用默认值“72”。

      同步复制

      客户端向Topic生产消息时,是否把消息复制给所有副本,然后才向消息客户端返回响应。为简单起见,此处不启用。

      同步落盘

      消息客户端向Topic生产的每条消息是否立即写入磁盘。为简单起见,此处不启用。

      标签

      添加Topic的标签信息,用于快速过滤和查找Topic。为简单起见,此处不添加标签。

      敏感字段

      添加Topic的消息敏感字段。若向Topic生成的消息中包含敏感字段,则敏感字段内容会被屏蔽。为简单起见,此处不添加。

      描述

      填写Topic的描述信息。

步骤二:向Topic收发消息

在本地PC上使用Kafka命令行工具,通过命令行方式向Topic收发消息。

根据ROMA Connect实例是否开启SASL_SSL,向Topic收发消息的操作有所差异。若开启SASL_SSL访问,则客户端向Topic发送和接收的消息会加密传输,安全性更高。

  1. 解压Kafka命令行工具和客户端证书。

    在本地PC找到已下载的Kafka命令行工具和客户端证书文件,并分别解压。

    此处以Windows系统为例,并假设解压后的Kafka命令行工具路径为“D:\kafka_2.11-1.1.0”,客户端证书路径为“D:\cert”。

  2. (可选)修改Kafka命令行工具中的kafka-run-class.bat脚本文件。仅当使用1.1.0版本Kafka命令行工具时需要修改脚本文件,否则跳过此步骤。

    在“D:\kafka_2.11-1.1.0\bin\windows”路径下找到kafka-run-class.bat文件,并在文件内容中的以下脚本行中,为“%CLASSPATH%”加上英文双引号,如下所示。

    set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
  3. (可选)修改Kafka命令行工具配置文件。仅当ROMA Connect实例已开启SASL_SSL时需要修改配置文件,否则跳过此步骤。

    在“D:\kafka_2.11-1.1.0\config”路径下找到consumer.propertiesproducer.properties文件,并分别在文件中增加如下内容。

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="**********" \
    password="**********";
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    ssl.truststore.location=D:/cert/client.truststore.jks
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    其中:

    • usernamepassword的值分别为Topic所属集成应用的Key和Secret。您可以在ROMA Connect实例控制台的“集成应用”页面中,单击集成应用的名称查看并获取Key和Secret。
    • ssl.truststore.location的值为1中解压得到的客户端证书的存放路径。

      Windows系统下证书路径中必须使用“/”。

  4. 在命令行窗口执行以下命令进入Kafka命令行工具的目录。
    d:
    cd kafka_2.11-1.1.0\bin\windows
  5. 向ROMA Connect生产消息。
    1. 执行以下命令,与Topic建立生产消息的连接。
      ROMA Connect实例未开启SASL_SSL时执行的命令:
      kafka-console-producer.bat --broker-list IP:9094,IP:9095,IP:9096 --topic TopicName

      ROMA Connect实例已开启SASL_SSL时执行的命令:

      kafka-console-producer.bat --broker-list IP:9095,IP:9096,IP:9097 --topic TopicName --producer.config ../../config/producer.properties

      其中:

      • IP为ROMA Connect的消息集成连接地址,可在ROMA Connect实例控制台的“实例信息”页面中查看“弹性IP地址”。
      • TopicName步骤一:创建消息Topic中创建Topic的名称。
    2. 输入消息内容,向Topic发送消息。
      >Message1
      >Message2
      >Message3

      其中,Message1Message2Message3为向Topic发送的实际消息内容,一行为一条消息。

    3. 在ROMA Connect实例控制台选择“消息集成MQS > 消息查询”,进入消息查询页面。
    4. 单击“高级搜索”展开高级搜索框。
    5. 输入搜索条件,然后单击“搜索”,查询客户端发送的消息记录。
      • “Topic名称”选择步骤一:创建消息Topic中创建的消息Topic。
      • “查询方式”选择“按生产时间查询”,并选择客户端向ROMA Connect发送消息的时间段。
    6. 单击消息记录后的“消息内容”,查看消息内容,确认是否与5.b中发送的内容一致。
      图2 查看消息
  6. 从ROMA Connect消费消息。
    1. 执行以下命令,与Topic建立消费消息的连接并读取消息。
      ROMA Connect实例未开启SASL_SSL时执行的命令:
      kafka-console-consumer.bat --bootstrap-server IP:9094,IP:9095,IP:9096 --topic TopicName --from-beginning

      ROMA Connect实例已开启SASL_SSL时执行的命令:

      kafka-console-consumer.bat --bootstrap-server IP:9095,IP:9096,IP:9097 --topic TopicName --from-beginning --consumer.config ../../config/consumer.properties

      其中:

      • IP为ROMA Connect的消息集成连接地址,可在ROMA Connect实例控制台的“实例信息”页面中查看“弹性IP地址”。
      • TopicName步骤一:创建消息Topic中创建Topic的名称。
    2. 执行命令后,会持续连接Topic并读取消息。若要停止读取消息,按“Ctrl+C”,然后输入“Y”并回车,结束读取消息。