- 最新动态
- 功能总览
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- 开始使用ROMA Connect
- 实例管理
- 集成应用管理
-
数据源管理
- ROMA Connect支持的数据源
- 接入API数据源
- 接入ActiveMQ数据源
- 接入ArtemisMQ数据源
- 接入DB2数据源
- 接入DIS数据源
- 接入DWS数据源
- 接入DM数据源
- 接入Gauss100数据源
- 接入FTP数据源
- 接入HL7数据源
- 接入HANA数据源
- 接入HIVE数据源
- 接入LDAP数据源
- 接入IBM MQ数据源
- 接入Kafka数据源
- 接入MySQL数据源
- 接入MongoDB数据源
- 接入MQS数据源
- 接入MRS Hive数据源
- 接入MRS HDFS数据源
- 接入MRS HBase数据源
- 接入MRS Kafka数据源
- 接入OBS数据源
- 接入Oracle数据源
- 接入PostgreSQL数据源
- 接入Redis数据源
- 接入RabbitMQ数据源
- 接入RocketMQ数据源
- 接入SAP数据源
- 接入SNMP数据源
- 接入SQL Server数据源
- 接入GaussDB(for MySQL)数据源
- 接入WebSocket数据源
- 接入自定义数据源
- 数据集成指导
- 服务集成指导
- 服务集成指导(旧版界面)
- 消息集成指导
- 设备集成指导
- 应用业务模型使用指导
- 扩大资源配额
- 查看审计日志
- 查看监控指标
- 权限管理
- 用户指南(新版)
- 最佳实践
-
开发指南
- 数据集成开发指导
-
服务集成开发指导
- 开发说明
- API调用认证开发(APP认证)
- API调用认证开发(IAM认证)
-
自定义后端开发(函数后端)
- 函数后端脚本开发说明
- AesUtils类说明
- APIConnectResponse类说明
- Base64Utils类说明
- CacheUtils类说明
- CipherUtils类说明
- ConnectionConfig类说明
- DataSourceClient类说明
- DataSourceConfig类说明
- ExchangeConfig类说明
- HttpClient类说明
- HttpConfig类说明
- JedisConfig类说明
- JSON2XMLHelper类说明
- JSONHelper类说明
- JsonUtils类说明
- JWTUtils类说明
- KafkaConsumer类说明
- KafkaProducer类说明
- KafkaConfig类说明
- MD5Encoder类说明
- Md5Utils类说明
- QueueConfig类说明
- RabbitMqConfig类说明
- RabbitMqProducer类说明
- RedisClient类说明
- RomaWebConfig类说明
- RSAUtils类说明
- SapRfcClient类说明
- SapRfcConfig类说明
- SoapClient类说明
- SoapConfig类说明
- StringUtils类说明
- TextUtils类说明
- XmlUtils类说明
- 自定义后端开发(数据后端)
- 后端服务签名校验开发
- 消息集成开发指导
- 设备集成开发指导
-
API参考
- 使用前必读
- API概览
- 如何调用API
- 公共资源API
- 数据集成API
- 服务集成API
- 消息集成API
- 设备集成API
- 应用示例
- 权限和授权项
- 附录
- 历史API
- 修订记录
- SDK参考
-
常见问题
- 实例管理
-
数据集成
-
数据集成普通任务
- FDI各类数据库支持哪些数据类型?
- 跟踪号是什么,能跟踪到数据吗?
- FDI任务是否支持清空目标表?
- FDI任务只能采集单张表到单张表吗?
- 用户创建的FDI任务,同一账号的其他用户可见吗?
- FDI通过公网对接其他租户的MRS HIVE如何配置?
- 从OBS解析文件到RDS数据库,采集过一次后,后面采集会进行更新吗?
- OBS源端的CSV文件解析到关系型数据库时,列的值不对怎么办?
- MRS Hive目标字段和源端字段数据类型不匹配时,数据是否能集成到目标端?
- MRS Hive、MRS HBase和MongoDB的Mapping映射手动输入时,是否区分大小写?
- MRS Hive是否支持分区?
- 源端API类型数据源自定义周期如何设置?
- SAP是否支持分页读取视图?
- 数据集成组合任务
-
数据集成普通任务
- 服务集成
- 消息集成
- 设备集成
-
故障排除
-
数据集成任务
- MRS Hive目标端写入时出现数据乱码
- MRS Hive写入时数据全部写在第一个字段里
- 目标端任务报任务运行超时
- MySQL到MRS Hive时目标端报“could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation”错误
- Mysql到Mysql时源端报“Illegal mix of collations for operation 'UNION'”错误
- 源端Mysql增量采集每小时执行一次时部分数据丢失
- API到MySQL时源端报“401 unauthorized”错误
- Kafka集到Mysql目标端报“cannot find record mapping field”错误
- API到MySQL的定时任务时会出现源端报“connect timeout”错误
- Kafka到Mysql的实时任务时,MQS中的Topic下有数据,但是FDI任务没有采集到数据。
- Mysql到Mysql的定时任务,源端有类型为tinyint(1),值为2的字段,但是采集到目标端值就变成了1
- 目标端数据源为公网Kafka时,定时任务目标端报“The task executes failed.Writer data to kafka failed”错误
- 数据集成组合任务
- 数据源
- 服务集成
- 设备集成
-
数据集成任务
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
使用ROMA Connect集成消息
概述
ROMA Connect提供了安全、标准化消息通道,实现不同消息系统的集成对接。
本章节通过完成一个使用Kafka命令行与ROMA Connect对接的配置样例,帮助您快速熟悉使用ROMA Connect集成消息的过程。
使用ROMA Connect集成消息的步骤如下所示:
准备工作
在开始操作前,您需要提前完成以下准备工作。
- ROMA Connect实例已绑定弹性IP,且本地PC所在网络与弹性IP的网络互通。
- 已在本地PC下载并安装Java JDK,并完成相关环境变量的配置。
- 根据ROMA Connect实例的Kafka版本,下载对应版本的开源Kafka命令行工具。您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。
说明:
ROMA Connect的Kafka服务端版本为1.1.0、2.7版本,请使用与服务端相同版本的Kafka命令行工具,避免出现不可预知的问题。
- 若ROMA Connect实例启用了“MQS SASL_SSL”,需要在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击“下载SSL证书”下载客户端证书。
步骤一:创建消息Topic
Topic是消息客户端与ROMA Connect间进行消息传输的通道,客户端通过Topic向ROMA Connect收发消息。
- 创建集成应用。
- 登录ROMA Connect控制台,在“实例”页面单击实例上的“查看控制台”,进入实例控制台。
- 在左侧的导航栏选择“集成应用”,单击页面右上角的“创建集成应用”。
- 在创建集成应用弹窗中填写集成应用的“名称”,然后单击“确认”。
- 创建消息Topic。
- 在左侧的导航栏选择“消息集成 MQS > Topic管理”,单击页面右上角的“创建Topic”。
- 在创建Topic弹窗中填写Topic相关配置信息,然后单击“确定”,创建Topic。
图1 创建Topic
表1 Topic配置 参数
配置说明
Topic名称
填写Topic的名称,根据规划自定义。建议您按照一定的命名规则填写Topic名称,方便您快速识别和查找。
集成应用
选择1中创建的集成应用。
权限
为Topic所属的集成应用选择对Topic的操作权限,此处选择“发布+订阅”,即该Topic可用于生产和消费消息。
分区数
合理设置分区数量,可以提升消息生产与消费的并发性能。为简单起见,此处使用默认值“3”。
副本数
ROMA Connect会自动在每个副本上备份数据,当其中一个副本故障时数据依然可用,Topic的副本数越多,可靠性越高。为简单起见,此处使用默认值“3”。
老化时间(小时)
超过老化时间后,Topic中存储的消息将会被删除。为简单起见,此处使用默认值“72”。
同步复制
客户端向Topic生产消息时,是否把消息复制给所有副本,然后才向消息客户端返回响应。为简单起见,此处不启用。
同步落盘
消息客户端向Topic生产的每条消息是否立即写入磁盘。为简单起见,此处不启用。
标签
添加Topic的标签信息,用于快速过滤和查找Topic。为简单起见,此处不添加标签。
过滤字段
添加Topic的消息过滤字段。若向Topic生成的消息中包含过滤字段,则包含过滤字段的整条消息内容会被屏蔽。为简单起见,此处不添加。
描述
填写Topic的描述信息。
步骤二:向Topic收发消息
在本地PC上使用Kafka命令行工具,通过命令行方式向Topic收发消息。
根据ROMA Connect实例是否开启SASL_SSL,向Topic收发消息的操作有所差异。若开启SASL_SSL访问,则客户端向Topic发送和接收的消息会加密传输,安全性更高。
- 解压Kafka命令行工具和客户端证书。
在本地PC找到已下载的Kafka命令行工具和客户端证书文件,并分别解压。
此处以Windows系统为例,并假设解压后的Kafka命令行工具路径为“D:\kafka_2.11-1.1.0”,客户端证书路径为“D:\cert”。
- (可选)修改Kafka命令行工具中的kafka-run-class.bat脚本文件。仅当使用1.1.0版本Kafka命令行工具时需要修改脚本文件,否则跳过此步骤。
在“D:\kafka_2.11-1.1.0\bin\windows”路径下找到kafka-run-class.bat文件,并在文件内容中的以下脚本行中,为“%CLASSPATH%”加上英文双引号,如下所示。
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
- (可选)修改Kafka命令行工具配置文件。仅当ROMA Connect实例已开启SASL_SSL时需要修改配置文件,否则跳过此步骤。
在“D:\kafka_2.11-1.1.0\config”路径下找到consumer.properties和producer.properties文件,并分别在文件中增加如下内容。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN security.protocol=SASL_SSL ssl.truststore.location=D:/cert/client.truststore.jks ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
其中:
- username和password的值分别为Topic所属集成应用的Key和Secret。您可以在ROMA Connect实例控制台的“集成应用”页面中,单击集成应用的名称查看并获取Key和Secret。
- ssl.truststore.location的值为1中解压得到的客户端证书的存放路径。
须知:
Windows系统下证书路径中必须使用“/”。
- 在命令行窗口执行以下命令进入Kafka命令行工具的目录。
d: cd kafka_2.11-1.1.0\bin\windows
- 向ROMA Connect生产消息。
- 执行以下命令,与Topic建立生产消息的连接。
ROMA Connect实例未开启SASL_SSL时执行的命令:
kafka-console-producer.bat --broker-list IP:9094,IP:9095,IP:9096 --topic TopicName
ROMA Connect实例已开启SASL_SSL时执行的命令:
kafka-console-producer.bat --broker-list IP:9095,IP:9096,IP:9097 --topic TopicName --producer.config ../../config/producer.properties
其中:
- IP为ROMA Connect的消息集成连接地址,可在ROMA Connect实例控制台的“实例信息”页面中查看“弹性IP地址”。
- TopicName为步骤一:创建消息Topic中创建Topic的名称。
- 输入消息内容,向Topic发送消息。
>Message1 >Message2 >Message3
其中,Message1、Message2、Message3为向Topic发送的实际消息内容,一行为一条消息。
- 在ROMA Connect实例控制台选择“消息集成MQS > 消息查询”,进入消息查询页面。
- 单击“高级搜索”展开高级搜索框。
- 输入搜索条件,然后单击“搜索”,查询客户端发送的消息记录。
- “Topic名称”选择步骤一:创建消息Topic中创建的消息Topic。
- “查询方式”选择“按生产时间查询”,并选择客户端向ROMA Connect发送消息的时间段。
- 单击消息记录后的“消息内容”,查看消息内容,确认是否与5.b中发送的内容一致。
图2 查看消息
- 执行以下命令,与Topic建立生产消息的连接。
- 从ROMA Connect消费消息。
- 执行以下命令,与Topic建立消费消息的连接并读取消息。
ROMA Connect实例未开启SASL_SSL时执行的命令:
kafka-console-consumer.bat --bootstrap-server IP:9094,IP:9095,IP:9096 --topic TopicName --from-beginning
ROMA Connect实例已开启SASL_SSL时执行的命令:
kafka-console-consumer.bat --bootstrap-server IP:9095,IP:9096,IP:9097 --topic TopicName --from-beginning --consumer.config ../../config/consumer.properties
其中:
- IP为ROMA Connect的消息集成连接地址,可在ROMA Connect实例控制台的“实例信息”页面中查看“弹性IP地址”。
- TopicName为步骤一:创建消息Topic中创建Topic的名称。
- 执行命令后,会持续连接Topic并读取消息。若要停止读取消息,按“Ctrl+C”,然后输入“Y”并回车,结束读取消息。
- 执行以下命令,与Topic建立消费消息的连接并读取消息。