应用与数据集成平台 ROMA Connect
应用与数据集成平台 ROMA Connect
- 最新动态
- 功能总览
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- 开始使用ROMA Connect
- 实例管理
- 集成应用管理
-
数据源管理
- ROMA Connect支持的数据源
- 接入API数据源
- 接入ActiveMQ数据源
- 接入ArtemisMQ数据源
- 接入DB2数据源
- 接入DIS数据源
- 接入DWS数据源
- 接入DM数据源
- 接入Gauss100数据源
- 接入FTP数据源
- 接入HL7数据源
- 接入HANA数据源
- 接入HIVE数据源
- 接入LDAP数据源
- 接入IBM MQ数据源
- 接入Kafka数据源
- 接入MySQL数据源
- 接入MongoDB数据源
- 接入MQS数据源
- 接入MRS Hive数据源
- 接入MRS HDFS数据源
- 接入MRS HBase数据源
- 接入MRS Kafka数据源
- 接入OBS数据源
- 接入Oracle数据源
- 接入PostgreSQL数据源
- 接入Redis数据源
- 接入RabbitMQ数据源
- 接入RocketMQ数据源
- 接入SAP数据源
- 接入SNMP数据源
- 接入SQL Server数据源
- 接入GaussDB(for MySQL)数据源
- 接入WebSocket数据源
- 接入自定义数据源
- 数据集成指导
- 服务集成指导
- 服务集成指导(旧版界面)
- 消息集成指导
- 设备集成指导
- 应用业务模型使用指导
- 扩大资源配额
- 查看审计日志
- 查看监控指标
- 权限管理
- 用户指南(新版)
- 最佳实践
-
开发指南
- 数据集成开发指导
-
服务集成开发指导
- 开发说明
- API调用认证开发(APP认证)
- API调用认证开发(IAM认证)
-
自定义后端开发(函数后端)
- 函数后端脚本开发说明
- AesUtils类说明
- APIConnectResponse类说明
- Base64Utils类说明
- CacheUtils类说明
- CipherUtils类说明
- ConnectionConfig类说明
- DataSourceClient类说明
- DataSourceConfig类说明
- ExchangeConfig类说明
- HttpClient类说明
- HttpConfig类说明
- JedisConfig类说明
- JSON2XMLHelper类说明
- JSONHelper类说明
- JsonUtils类说明
- JWTUtils类说明
- KafkaConsumer类说明
- KafkaProducer类说明
- KafkaConfig类说明
- MD5Encoder类说明
- Md5Utils类说明
- QueueConfig类说明
- RabbitMqConfig类说明
- RabbitMqProducer类说明
- RedisClient类说明
- RomaWebConfig类说明
- RSAUtils类说明
- SapRfcClient类说明
- SapRfcConfig类说明
- SoapClient类说明
- SoapConfig类说明
- StringUtils类说明
- TextUtils类说明
- XmlUtils类说明
- 自定义后端开发(数据后端)
- 后端服务签名校验开发
- 消息集成开发指导
- 设备集成开发指导
-
API参考
- 使用前必读
- API概览
- 如何调用API
- 公共资源API
- 数据集成API
- 服务集成API
- 消息集成API
- 设备集成API
- 应用示例
- 权限和授权项
- 附录
- 历史API
- 修订记录
- SDK参考
-
常见问题
- 实例管理
-
数据集成
-
数据集成普通任务
- FDI各类数据库支持哪些数据类型?
- 跟踪号是什么,能跟踪到数据吗?
- FDI任务是否支持清空目标表?
- FDI任务只能采集单张表到单张表吗?
- 用户创建的FDI任务,同一账号的其他用户可见吗?
- FDI通过公网对接其他租户的MRS HIVE如何配置?
- 从OBS解析文件到RDS数据库,采集过一次后,后面采集会进行更新吗?
- OBS源端的CSV文件解析到关系型数据库时,列的值不对怎么办?
- MRS Hive目标字段和源端字段数据类型不匹配时,数据是否能集成到目标端?
- MRS Hive、MRS HBase和MongoDB的Mapping映射手动输入时,是否区分大小写?
- MRS Hive是否支持分区?
- 源端API类型数据源自定义周期如何设置?
- SAP是否支持分页读取视图?
- 数据集成组合任务
-
数据集成普通任务
- 服务集成
- 消息集成
- 设备集成
-
故障排除
-
数据集成任务
- MRS Hive目标端写入时出现数据乱码
- MRS Hive写入时数据全部写在第一个字段里
- 目标端任务报任务运行超时
- MySQL到MRS Hive时目标端报“could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation”错误
- Mysql到Mysql时源端报“Illegal mix of collations for operation 'UNION'”错误
- 源端Mysql增量采集每小时执行一次时部分数据丢失
- API到MySQL时源端报“401 unauthorized”错误
- Kafka集到Mysql目标端报“cannot find record mapping field”错误
- API到MySQL的定时任务时会出现源端报“connect timeout”错误
- Kafka到Mysql的实时任务时,MQS中的Topic下有数据,但是FDI任务没有采集到数据。
- Mysql到Mysql的定时任务,源端有类型为tinyint(1),值为2的字段,但是采集到目标端值就变成了1
- 目标端数据源为公网Kafka时,定时任务目标端报“The task executes failed.Writer data to kafka failed”错误
- 数据集成组合任务
- 数据源
- 服务集成
- 设备集成
-
数据集成任务
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
Java Demo使用说明
操作场景
除了前面章节介绍的使用原生Kafka客户端,MQS实例还可以通过HTTP RESTful方式访问,包括向指定Topic发送消息、消费消息以及确认消费。
这种方式主要用于适配原有业务系统架构,方便统一使用HTTP协议接入。
入门版规格实例不支持RESTful API方式连接MQS。
操作流程
前提条件
本指南提供了Java语言的RESTful API请求发送示例。示例为一个在IntelliJ IDEA工具中开发的Maven工程,因此,您如果在本地环境使用,请先安装并配置以下环境(以Windows 10系统为例):
- 获取Demo。
在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上角的“下载示例 > 下载RESTful API Java Demo包”下载Demo。
导入工程
- 打开IntelliJ IDEA,在菜单栏选择“Import Project”。
- 在弹出的对话框中选择解压后的RESTful API Java Demo路径,单击“OK”。
- “Import project from external model”选择“Eclipse”,单击“Next”,进入下一页后保持默认连续单击“Next”,直到“Please select project SDK”页面。
图1 Import Project
- 单击“Finish”,完成工程导入。
图2 Finish
- 编辑rest-config.properties
文件在src/main/resources目录下。将获取到的MQS实例连接地址、Topic名称,以及SASL信息填写到下述配置中。其中参数kafka.rest.group为消费组ID,可在客户端指定。
# Kafka rest endpoint. kafka.rest.endpoint=https://{MQS_Instance_IP_Addr}:9292 # Kafka topic name. kafka.rest.topic=topic_name_demo # Kafka consume group. kafka.rest.group=group_id_demo # Kafka sasl username. kafka.rest.username=sasl_username_demo # Kafka sasl password. kafka.rest.password=sasl_user_passwd_demo
- 编辑log4j.properties
log.directory=D://workspace/logs
- 运行示例工程,查看消息生产与消费样例。
示例代码解读
- 工程入口:
public class RestMain { private static final Logger LOGGER = LoggerFactory.getLogger(RestMain.class); public static void main(String[] args) throws InterruptedException { //初始化请求对象。在RestServiceImpl类文件中还包含RESTful API组装,以及对请求签名 IRestService restService = new RestServiceImpl(); Base64.Decoder decoder = Base64.getDecoder(); //以下分别为生产消息、消费消息与消费确认 // Produce message ProduceReq messages = new ProduceReq(); messages.addMessage("{[{'id': '00001', 'name': 'John'}, {'id': '00002', 'name': 'Mike'}]}").addMessage("Kafka rest client demo!"); LOGGER.debug("produce message: {}", JsonUtils.convertObject2Str(messages)); restService.produce(messages); // Consume message List<ConsumeResp> consumeResps = restService.consume(); CommitReq commitReq = new CommitReq(); consumeResps.forEach(resp -> { LOGGER.debug("handler: {}, content: {}", resp.getHandler(), new String(decoder.decode(resp.getMessage().getContent()))); commitReq.addCommit(resp.getHandler()); }); // Commit message if (commitReq.getMessages().size() != 0) { CommitResp resp = restService.commit(commitReq); LOGGER.info("Commit resp: success: {}, failed: {}", resp.getSuccess(), resp.getFail()); } else { LOGGER.warn("Commit is empty."); } } }
- 消息组装与发送:
以生产消息为例,在下述方法中完成消息组装和签名。其中签名方法调用后,会返回两个消息头:Authorization和X-Sdk-Date,Authorization包含了对请求内容的签名信息。消息头的另一个参数Content-Type需要在代码中添加,参考示例的createRequest()方法。
public List<ProduceResp> produce(ProduceReq messages) { List<ProduceResp> prodResp = null; try { Request request = createRequest(); request.setUrl(produceURI); request.setMethod("POST"); request.setBody(JsonUtils.convertObject2Str(messages)); //对请求内容签名,签名后,请求头部参数会新增两个参数:Authorization和X-Sdk-Date,Authorization包含了对请求内容的签名信息。 HttpRequestBase signedRequest = Client.sign(request); LOGGER.debug("Request uri: {}, headers: {}", signedRequest.getURI(), signedRequest.getAllHeaders()); LOGGER.debug("Request body: {}", request.getBody()); HttpResponse response = HttpUtils.execute(signedRequest); if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED) { String jsonStr = EntityUtils.toString(response.getEntity(), "UTF-8"); prodResp = JsonUtils.convertStr2ListObject(jsonStr, new TypeReference<List<ProduceResp>>() { }); LOGGER.info("Produce response: {}", jsonStr); return prodResp; } else { LOGGER.error("Produce message failed. statusCode: {}, error msg: {}", response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8")); } } catch (Exception e) { LOGGER.error("Produce message failed."); } return prodResp; }
父主题: MQS连接开发(RESTful API)