更新时间:2024-08-26 GMT+08:00
分享

自定义数据源开发示例(定时任务)

操作场景

FDI当前支持接入MySQL类型数据库,但因为MySQL是最常用的数据库类型,可以方便开发者参照理解,故本章节以MySQL类型自定义连接器为例进行开发,示例使用Java语言进行开发,Demo代码参考MysqlConnctor.rar。

前提条件

  • 准备装有1.8及以上版本JDK的Linux服务器。
  • IntelliJ IDEA版本为:2018.3.5或以上版本,Eclipse版本为:3.6.0或以上版本。
  • 通过Demo(sha256:34c9bc8d99eba4ed193603019ce2b69afa3ed760a452231ece3c89fd7dd74da1)获取MysqlConnctor.rar包。
  • 如果使用自定义连接器来写数据,需要用户自行保证数据的可重复幂等写入。
  • RESTful接口单次请求的处理时间不能超过60s。
  • FDI端会循环调用RESTful接口地址,直到读完数据为止。

操作步骤

  1. 创建SpringBoot模板工程。

    示例代码:

    @SpringBootApplication
    public class MysqlConnectorApplication {
        public static void main(String[] args) {
            SpringApplication.run(MysqlConnectorApplication.class, args);
        }
    }
  2. 定义RESTful接口Controller层。

    示例代码:

    @RequestMapping(value = "mysql/reader", method = RequestMethod.POST)
        public ReaderResponseBody send(@RequestBody ReaderRequestBody readerRequestBody) throws Exception {
            if (readerRequestBody == null) {
                throw new RuntimeException("The reader request body is empty");
            }
            LOGGER.info("Accept a reader request, request={}", JSONObject.toJSONString(readerRequestBody));
     
            MysqlConfig mysqlConfig = getAndCheckMysqlConfig(readerRequestBody.getDatasource());
            String jdbcUrl = buildMysqlUrl(mysqlConfig);
            JSONArray dataList = mysqlReaderService.queryData(jdbcUrl, mysqlConfig, readerRequestBody.getParams());
     
            ReaderResponseBody readerResponseBody = new ReaderResponseBody();
            readerResponseBody.setDatas(dataList);
            return readerResponseBody;
        }
  3. 实现读写接口服务层。

    示例代码:

    @Service
    public class MysqlReaderService {
        public JSONArray queryData(String jdbcUrl, MysqlConfig mysqlConfig, ReaderParams readerParams) throws Exception {
            Connection conn = DBUtils.getConn(jdbcUrl, mysqlConfig);
            //获取分页参数
            int limit = 0;
            int offset = 0;
            if (readerParams.getPagination() != null) {
                Pagination pagination = readerParams.getPagination();
                limit = pagination.getLimit() == 0 ? 10 : pagination.getLimit();
                offset = pagination.getOffset() == 0 ? 1 : pagination.getOffset();
            }
            //获取要读取的表名
            String tableName = readerParams.getExtend().getString("table_name");
     
            //组装SQL
            StringBuilder sqlBuilder = new StringBuilder();
            sqlBuilder.append("select * from ").append(tableName);
    sqlBuilder.append(" limit ?,? ");
            PreparedStatement preparedStatement = conn.prepareStatement(sqlBuilder.toString());
            preparedStatement.setInt(1, (offset - 1) * limit);
            preparedStatement.setInt(2, limit);
            ResultSet resultSet = preparedStatement.executeQuery();
     
            //获取列名
            List<String> columnList = getColumnInfo(resultSet);
            //读取查询数据
            JSONArray dataArray = new JSONArray();
            while (resultSet.next()) {
                JSONObject data = new JSONObject();
                for (int i = 1; i <= columnList.size(); i++) {
                    data.put(columnList.get(i - 1), resultSet.getString(i));
                }
                dataArray.add(data);
            }
            return dataArray;
        }
    }
  4. 定义读写接口的入参和出参结构体。

    示例代码:

    public class ReaderRequestBody {
        private String job_name;
     
        private JSONObject datasource;
     
        private ReaderParams params;
    }
  5. 在根目录下执行以下命令,执行成功后会在MysqlConnector\target目录下生成可运行jar包,例如MysqlConnector-1.0-SNAPSHOT.jar。

    # mvn package

  6. 通过Linux或Windows将生成的jar包MysqlConnector-1.0-SNAPSHOT.jar上传到装有JDK环境的用户服务器上,执行以下命令运行即可。

    # java -jar MysqlConnector-1.0-SNAPSHOT.jar &

    在开发调试阶段,也可以直接基于Intellij idea或Eclipse工具启动,只要启动MysqlConnectorApplication.java类即可。

  7. 创建自定义连接器模型。
    1. 登录ROMA Connect控制台,在左侧的导航栏选择“资产管理”。
    2. 单击页面右上角的“创建连接器”,并参考创建连接器章节配置连接器信息。

      以MySQL为例,数据源定义中主要填写主机名、端口、数据库名、用户名以及密码。

      图1 连接器配置1

      读写参数定义中,填写自定义插件执行读写操作时需要获取的额外信息,如要读写的表名、可以进行增量读取的时间戳字段列名等。

      图2 连接器配置2
  8. 发布连接器

    连接器创建完成后,单击发布,可以发布连接器的实例。

    连接器和连接器实例的关系,类似编程中的类和类对象实例。

    连接器定义了针对某种数据源的规范,连接器实例则对应了具体的RESTful服务,所以需要指定RESTful连接地址 (由部署的用户服务器地址决定)。

    图3 发布连接器
  9. 接入自定义数据源。
    1. 在左侧的导航栏选择“数据源管理”,单击页面右上角的“接入数据源”。
    2. 在接入数据源页面的“自定义数据源”页签下,选择创建好的自定义连接器“MysqlConnector”,然后单击“下一步”。
    3. 在页面中配置数据源的连接信息,连接实例选择该连接器下的一个实例,并填入此连接器定义的数据源相关信息。
  10. 以自定义数据源作为源端,MySQL作为目标端为例创建定时任务。
    参考创建数据集成任务(普通任务)接入源端自定义数据源和目标端MySQL数据源,并创建定时任务。完成后运行任务,可以将源端自定义的数据源中的数据,迁移到MySQL数据源的表中。

    执行任务后,FDI会根据自定义连接实例定义的连接地址(http://127.0.0.1:19091/mysql)去读取或写入数据,读接口加后缀/reader,写接口加后缀/writer。

相关文档