更新时间:2024-10-10 GMT+08:00
自定义数据源开发示例(实时任务)
操作场景
对于实时数据源,当前自定义连接器不支持直接接入,只能通过消息中转的方式进行接入。本章节以MQS数据源类型为例进行开发,示例使用Java语言进行开发,Demo代码参考RealtimeConnector.rar。
前提条件
- 准备装有1.8及以上版本JDK的Linux服务器。
- IntelliJ IDEA版本为:2018.3.5或以上版本,Eclipse版本为:3.6.0或以上版本。
- 通过Demo(sha256:34c9bc8d99eba4ed193603019ce2b69afa3ed760a452231ece3c89fd7dd74da1)获取RealtimeConnector.rar包。
- 用户程序向MQS写入消息的TPS不能超过6000/s。
操作步骤
- 创建SpringBoot模板工程,在Main方法中启动对实时数据源的消费,对消费的数据使用MQS SDK生产到MQS中。
@SpringBootApplication public class RealtimeConnectorApplication { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConnectorApplication.class); public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer rocketMQConsumer = createRocketMQConsumer(); MqsProducer mqsProducer = new MqsProducer(); MessageListenerConcurrently rocketmqMessageListener = new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageList, ConsumeConcurrentlyContext context) { for (MessageExt message : messageList) { String jsonString = convertMessageToJsonString(message); //将JSON格式的数据写到MQS,mqs-topic为在MQS中创建的Topic,后续FDI任务消费此Topic mqsProducer.produce("mqs-topic", jsonString); } LOGGER.info("Success to process {} data", messageList.size()); //标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }; //注册回调实现类来处理从RocketMQ拉取回来的消息 rocketMQConsumer.registerMessageListener(rocketmqMessageListener); //启动RocketMQ的消费 rocketMQConsumer.start(); } private static DefaultMQPushConsumer createRocketMQConsumer() throws MQClientException { //实例化RocketMQ消费者,请填写实际消费组名称 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCompanyGroup"); //设置NameServer的地址,请填写实际地址 consumer.setNamesrvAddr("localhost:9876"); //订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("RocketMQTopic", "*"); return consumer; } /** * 将RocketMQ读取出来的消息转换成JSON格式的字符串,实际转换需根据RocketMQ的消息内容来实现 * * @param messageExt * @return */ private static String convertMessageToJsonString(MessageExt messageExt) { JSONObject jsonObject = new JSONObject(); jsonObject.put("id", 1); jsonObject.put("name", "zhangsan"); return jsonObject.toJSONString(); } }
- 在根目录下执行以下命令,执行成功后会在RealtimeConnector\target目录下生成可运行jar包,例如RealtimeConnector-1.0-SNAPSHOT.jar。
- 通过Linux或Windows将生成的jar包RealtimeConnector-1.0-SNAPSHOT.jar上传到装有JDK环境的用户服务器上,执行以下命令运行即可。
- 以MQS数据源作为源端,MySQL作为目标端为例创建实时任务。
参考创建数据集成任务(普通任务)接入源端MQS数据源和目标端MySQL数据源,并创建实时任务。完成后运行任务,可以将源端MQS数据源中的数据,迁移到MySQL数据源的表中。
父主题: 数据集成开发指导