应用与数据集成平台 ROMA Connect
应用与数据集成平台 ROMA Connect
- 最新动态
- 功能总览
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- 开始使用ROMA Connect
- 实例管理
- 集成应用管理
-
数据源管理
- ROMA Connect支持的数据源
- 接入API数据源
- 接入ActiveMQ数据源
- 接入ArtemisMQ数据源
- 接入DB2数据源
- 接入DIS数据源
- 接入DWS数据源
- 接入DM数据源
- 接入Gauss100数据源
- 接入FTP数据源
- 接入HL7数据源
- 接入HANA数据源
- 接入HIVE数据源
- 接入LDAP数据源
- 接入IBM MQ数据源
- 接入Kafka数据源
- 接入MySQL数据源
- 接入MongoDB数据源
- 接入MQS数据源
- 接入MRS Hive数据源
- 接入MRS HDFS数据源
- 接入MRS HBase数据源
- 接入MRS Kafka数据源
- 接入OBS数据源
- 接入Oracle数据源
- 接入PostgreSQL数据源
- 接入Redis数据源
- 接入RabbitMQ数据源
- 接入RocketMQ数据源
- 接入SAP数据源
- 接入SNMP数据源
- 接入SQL Server数据源
- 接入GaussDB(for MySQL)数据源
- 接入WebSocket数据源
- 接入自定义数据源
- 数据集成指导
- 服务集成指导
- 服务集成指导(旧版界面)
- 消息集成指导
- 设备集成指导
- 应用业务模型使用指导
- 扩大资源配额
- 查看审计日志
- 查看监控指标
- 权限管理
- 用户指南(新版)
- 最佳实践
-
开发指南
- 数据集成开发指导
-
服务集成开发指导
- 开发说明
- API调用认证开发(APP认证)
- API调用认证开发(IAM认证)
-
自定义后端开发(函数后端)
- 函数后端脚本开发说明
- AesUtils类说明
- APIConnectResponse类说明
- Base64Utils类说明
- CacheUtils类说明
- CipherUtils类说明
- ConnectionConfig类说明
- DataSourceClient类说明
- DataSourceConfig类说明
- ExchangeConfig类说明
- HttpClient类说明
- HttpConfig类说明
- JedisConfig类说明
- JSON2XMLHelper类说明
- JSONHelper类说明
- JsonUtils类说明
- JWTUtils类说明
- KafkaConsumer类说明
- KafkaProducer类说明
- KafkaConfig类说明
- MD5Encoder类说明
- Md5Utils类说明
- QueueConfig类说明
- RabbitMqConfig类说明
- RabbitMqProducer类说明
- RedisClient类说明
- RomaWebConfig类说明
- RSAUtils类说明
- SapRfcClient类说明
- SapRfcConfig类说明
- SoapClient类说明
- SoapConfig类说明
- StringUtils类说明
- TextUtils类说明
- XmlUtils类说明
- 自定义后端开发(数据后端)
- 后端服务签名校验开发
- 消息集成开发指导
- 设备集成开发指导
-
API参考
- 使用前必读
- API概览
- 如何调用API
- 公共资源API
- 数据集成API
- 服务集成API
- 消息集成API
- 设备集成API
- 应用示例
- 权限和授权项
- 附录
- 历史API
- 修订记录
- SDK参考
-
常见问题
- 实例管理
-
数据集成
-
数据集成普通任务
- FDI各类数据库支持哪些数据类型?
- 跟踪号是什么,能跟踪到数据吗?
- FDI任务是否支持清空目标表?
- FDI任务只能采集单张表到单张表吗?
- 用户创建的FDI任务,同一账号的其他用户可见吗?
- FDI通过公网对接其他租户的MRS HIVE如何配置?
- 从OBS解析文件到RDS数据库,采集过一次后,后面采集会进行更新吗?
- OBS源端的CSV文件解析到关系型数据库时,列的值不对怎么办?
- MRS Hive目标字段和源端字段数据类型不匹配时,数据是否能集成到目标端?
- MRS Hive、MRS HBase和MongoDB的Mapping映射手动输入时,是否区分大小写?
- MRS Hive是否支持分区?
- 源端API类型数据源自定义周期如何设置?
- SAP是否支持分页读取视图?
- 数据集成组合任务
-
数据集成普通任务
- 服务集成
- 消息集成
- 设备集成
-
故障排除
-
数据集成任务
- MRS Hive目标端写入时出现数据乱码
- MRS Hive写入时数据全部写在第一个字段里
- 目标端任务报任务运行超时
- MySQL到MRS Hive时目标端报“could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation”错误
- Mysql到Mysql时源端报“Illegal mix of collations for operation 'UNION'”错误
- 源端Mysql增量采集每小时执行一次时部分数据丢失
- API到MySQL时源端报“401 unauthorized”错误
- Kafka集到Mysql目标端报“cannot find record mapping field”错误
- API到MySQL的定时任务时会出现源端报“connect timeout”错误
- Kafka到Mysql的实时任务时,MQS中的Topic下有数据,但是FDI任务没有采集到数据。
- Mysql到Mysql的定时任务,源端有类型为tinyint(1),值为2的字段,但是采集到目标端值就变成了1
- 目标端数据源为公网Kafka时,定时任务目标端报“The task executes failed.Writer data to kafka failed”错误
- 数据集成组合任务
- 数据源
- 服务集成
- 设备集成
-
数据集成任务
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
自定义数据源开发示例(定时任务)
操作场景
FDI当前支持接入MySQL类型数据库,但因为MySQL是最常用的数据库类型,可以方便开发者参照理解,故本章节以MySQL类型自定义连接器为例进行开发,示例使用Java语言进行开发,Demo代码参考MysqlConnctor.rar。
前提条件
- 准备装有1.8及以上版本JDK的Linux服务器。
- IntelliJ IDEA版本为:2018.3.5或以上版本,Eclipse版本为:3.6.0或以上版本。
- 通过Demo(sha256:34c9bc8d99eba4ed193603019ce2b69afa3ed760a452231ece3c89fd7dd74da1)获取MysqlConnctor.rar包。
- 如果使用自定义连接器来写数据,需要用户自行保证数据的可重复幂等写入。
- RESTful接口单次请求的处理时间不能超过60s。
- FDI端会循环调用RESTful接口地址,直到读完数据为止。
操作步骤
- 创建SpringBoot模板工程。
@SpringBootApplication public class MysqlConnectorApplication { public static void main(String[] args) { SpringApplication.run(MysqlConnectorApplication.class, args); } }
- 定义RESTful接口Controller层。
@RequestMapping(value = "mysql/reader", method = RequestMethod.POST) public ReaderResponseBody send(@RequestBody ReaderRequestBody readerRequestBody) throws Exception { if (readerRequestBody == null) { throw new RuntimeException("The reader request body is empty"); } LOGGER.info("Accept a reader request, request={}", JSONObject.toJSONString(readerRequestBody)); MysqlConfig mysqlConfig = getAndCheckMysqlConfig(readerRequestBody.getDatasource()); String jdbcUrl = buildMysqlUrl(mysqlConfig); JSONArray dataList = mysqlReaderService.queryData(jdbcUrl, mysqlConfig, readerRequestBody.getParams()); ReaderResponseBody readerResponseBody = new ReaderResponseBody(); readerResponseBody.setDatas(dataList); return readerResponseBody; }
- 实现读写接口服务层。
@Service public class MysqlReaderService { public JSONArray queryData(String jdbcUrl, MysqlConfig mysqlConfig, ReaderParams readerParams) throws Exception { Connection conn = DBUtils.getConn(jdbcUrl, mysqlConfig); //获取分页参数 int limit = 0; int offset = 0; if (readerParams.getPagination() != null) { Pagination pagination = readerParams.getPagination(); limit = pagination.getLimit() == 0 ? 10 : pagination.getLimit(); offset = pagination.getOffset() == 0 ? 1 : pagination.getOffset(); } //获取要读取的表名 String tableName = readerParams.getExtend().getString("table_name"); //组装SQL StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("select * from ").append(tableName); sqlBuilder.append(" limit ?,? "); PreparedStatement preparedStatement = conn.prepareStatement(sqlBuilder.toString()); preparedStatement.setInt(1, (offset - 1) * limit); preparedStatement.setInt(2, limit); ResultSet resultSet = preparedStatement.executeQuery(); //获取列名 List<String> columnList = getColumnInfo(resultSet); //读取查询数据 JSONArray dataArray = new JSONArray(); while (resultSet.next()) { JSONObject data = new JSONObject(); for (int i = 1; i <= columnList.size(); i++) { data.put(columnList.get(i - 1), resultSet.getString(i)); } dataArray.add(data); } return dataArray; } }
- 定义读写接口的入参和出参结构体。
public class ReaderRequestBody { private String job_name; private JSONObject datasource; private ReaderParams params; }
- 在根目录下执行以下命令,执行成功后会在MysqlConnector\target目录下生成可运行jar包,例如MysqlConnector-1.0-SNAPSHOT.jar。
- 通过Linux或Windows将生成的jar包MysqlConnector-1.0-SNAPSHOT.jar上传到装有JDK环境的用户服务器上,执行以下命令运行即可。
# java -jar MysqlConnector-1.0-SNAPSHOT.jar &
说明:
在开发调试阶段,也可以直接基于Intellij idea或Eclipse工具启动,只要启动MysqlConnectorApplication.java类即可。
- 创建自定义连接器模型。
- 登录ROMA Connect控制台,在左侧的导航栏选择“资产管理”。
- 单击页面右上角的“创建连接器”,并参考创建连接器章节配置连接器信息。
以MySQL为例,数据源定义中主要填写主机名、端口、数据库名、用户名以及密码。
图1 连接器配置1读写参数定义中,填写自定义插件执行读写操作时需要获取的额外信息,如要读写的表名、可以进行增量读取的时间戳字段列名等。
图2 连接器配置2
- 发布连接器
说明:
连接器和连接器实例的关系,类似编程中的类和类对象实例。
连接器定义了针对某种数据源的规范,连接器实例则对应了具体的RESTful服务,所以需要指定RESTful连接地址 (由部署的用户服务器地址决定)。
图3 发布连接器 - 接入自定义数据源。
- 在左侧的导航栏选择“数据源管理”,单击页面右上角的“接入数据源”。
- 在接入数据源页面的“自定义数据源”页签下,选择创建好的自定义连接器“MysqlConnector”,然后单击“下一步”。
- 在页面中配置数据源的连接信息,连接实例选择该连接器下的一个实例,并填入此连接器定义的数据源相关信息。
- 以自定义数据源作为源端,MySQL作为目标端为例创建定时任务。
参考创建数据集成任务(普通任务)接入源端自定义数据源和目标端MySQL数据源,并创建定时任务。完成后运行任务,可以将源端自定义的数据源中的数据,迁移到MySQL数据源的表中。
说明:
执行任务后,FDI会根据自定义连接实例定义的连接地址(http://127.0.0.1:19091/mysql)去读取或写入数据,读接口加后缀/reader,写接口加后缀/writer。
父主题: 数据集成开发指导