自定义数据源开发示例(定时任务)
操作场景
FDI当前支持接入MySQL类型数据库,但因为MySQL是最常用的数据库类型,可以方便开发者参照理解,故本章节以MySQL类型自定义连接器为例进行开发,示例使用Java语言进行开发,Demo代码参考MysqlConnctor.rar。
前提条件
- 准备装有1.8及以上版本JDK的Linux服务器。
- IntelliJ IDEA版本为:2018.3.5或以上版本,Eclipse版本为:3.6.0或以上版本。
- 通过Demo(sha256:34c9bc8d99eba4ed193603019ce2b69afa3ed760a452231ece3c89fd7dd74da1)获取MysqlConnctor.rar包。
- 如果使用自定义连接器来写数据,需要用户自行保证数据的可重复幂等写入。
- RESTful接口单次请求的处理时间不能超过60s。
- FDI端会循环调用RESTful接口地址,直到读完数据为止。
操作步骤
- 创建SpringBoot模板工程。
@SpringBootApplication public class MysqlConnectorApplication { public static void main(String[] args) { SpringApplication.run(MysqlConnectorApplication.class, args); } }
- 定义RESTful接口Controller层。
@RequestMapping(value = "mysql/reader", method = RequestMethod.POST) public ReaderResponseBody send(@RequestBody ReaderRequestBody readerRequestBody) throws Exception { if (readerRequestBody == null) { throw new RuntimeException("The reader request body is empty"); } LOGGER.info("Accept a reader request, request={}", JSONObject.toJSONString(readerRequestBody)); MysqlConfig mysqlConfig = getAndCheckMysqlConfig(readerRequestBody.getDatasource()); String jdbcUrl = buildMysqlUrl(mysqlConfig); JSONArray dataList = mysqlReaderService.queryData(jdbcUrl, mysqlConfig, readerRequestBody.getParams()); ReaderResponseBody readerResponseBody = new ReaderResponseBody(); readerResponseBody.setDatas(dataList); return readerResponseBody; }
- 实现读写接口服务层。
@Service public class MysqlReaderService { public JSONArray queryData(String jdbcUrl, MysqlConfig mysqlConfig, ReaderParams readerParams) throws Exception { Connection conn = DBUtils.getConn(jdbcUrl, mysqlConfig); //获取分页参数 int limit = 0; int offset = 0; if (readerParams.getPagination() != null) { Pagination pagination = readerParams.getPagination(); limit = pagination.getLimit() == 0 ? 10 : pagination.getLimit(); offset = pagination.getOffset() == 0 ? 1 : pagination.getOffset(); } //获取要读取的表名 String tableName = readerParams.getExtend().getString("table_name"); //组装SQL StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("select * from ").append(tableName); sqlBuilder.append(" limit ?,? "); PreparedStatement preparedStatement = conn.prepareStatement(sqlBuilder.toString()); preparedStatement.setInt(1, (offset - 1) * limit); preparedStatement.setInt(2, limit); ResultSet resultSet = preparedStatement.executeQuery(); //获取列名 List<String> columnList = getColumnInfo(resultSet); //读取查询数据 JSONArray dataArray = new JSONArray(); while (resultSet.next()) { JSONObject data = new JSONObject(); for (int i = 1; i <= columnList.size(); i++) { data.put(columnList.get(i - 1), resultSet.getString(i)); } dataArray.add(data); } return dataArray; } }
- 定义读写接口的入参和出参结构体。
public class ReaderRequestBody { private String job_name; private JSONObject datasource; private ReaderParams params; }
- 在根目录下执行以下命令,执行成功后会在MysqlConnector\target目录下生成可运行jar包,例如MysqlConnector-1.0-SNAPSHOT.jar。
- 通过Linux或Windows将生成的jar包MysqlConnector-1.0-SNAPSHOT.jar上传到装有JDK环境的用户服务器上,执行以下命令运行即可。
# java -jar MysqlConnector-1.0-SNAPSHOT.jar &
在开发调试阶段,也可以直接基于Intellij idea或Eclipse工具启动,只要启动MysqlConnectorApplication.java类即可。
- 创建自定义连接器模型。
- 登录ROMA Connect控制台,在左侧的导航栏选择“资产管理”。
- 单击页面右上角的“创建连接器”,并参考创建连接器章节配置连接器信息。
以MySQL为例,数据源定义中主要填写主机名、端口、数据库名、用户名以及密码。
图1 连接器配置1
读写参数定义中,填写自定义插件执行读写操作时需要获取的额外信息,如要读写的表名、可以进行增量读取的时间戳字段列名等。
图2 连接器配置2
- 发布连接器
连接器和连接器实例的关系,类似编程中的类和类对象实例。
连接器定义了针对某种数据源的规范,连接器实例则对应了具体的RESTful服务,所以需要指定RESTful连接地址 (由部署的用户服务器地址决定)。
图3 发布连接器
- 接入自定义数据源。
- 在左侧的导航栏选择“数据源管理”,单击页面右上角的“接入数据源”。
- 在接入数据源页面的“自定义数据源”页签下,选择创建好的自定义连接器“MysqlConnector”,然后单击“下一步”。
- 在页面中配置数据源的连接信息,连接实例选择该连接器下的一个实例,并填入此连接器定义的数据源相关信息。
- 以自定义数据源作为源端,MySQL作为目标端为例创建定时任务。
参考创建数据集成任务(普通任务)接入源端自定义数据源和目标端MySQL数据源,并创建定时任务。完成后运行任务,可以将源端自定义的数据源中的数据,迁移到MySQL数据源的表中。
执行任务后,FDI会根据自定义连接实例定义的连接地址(http://127.0.0.1:19091/mysql)去读取或写入数据,读接口加后缀/reader,写接口加后缀/writer。