更新时间:2024-10-10 GMT+08:00

自定义数据源开发示例(实时任务)

操作场景

对于实时数据源,当前自定义连接器不支持直接接入,只能通过消息中转的方式进行接入。本章节以MQS数据源类型为例进行开发,示例使用Java语言进行开发,Demo代码参考RealtimeConnector.rar。

前提条件

  • 准备装有1.8及以上版本JDK的Linux服务器。
  • IntelliJ IDEA版本为:2018.3.5或以上版本,Eclipse版本为:3.6.0或以上版本。
  • 通过Demo(sha256:34c9bc8d99eba4ed193603019ce2b69afa3ed760a452231ece3c89fd7dd74da1)获取RealtimeConnector.rar包。
  • 用户程序向MQS写入消息的TPS不能超过6000/s。

操作步骤

  1. 创建SpringBoot模板工程,在Main方法中启动对实时数据源的消费,对消费的数据使用MQS SDK生产到MQS中。

    示例代码:

    @SpringBootApplication
    public class RealtimeConnectorApplication {
        private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConnectorApplication.class);
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer rocketMQConsumer = createRocketMQConsumer();
            MqsProducer mqsProducer = new MqsProducer();
            MessageListenerConcurrently rocketmqMessageListener = new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageList,
                    ConsumeConcurrentlyContext context) {
                    for (MessageExt message : messageList) {
                        String jsonString = convertMessageToJsonString(message);
                        //将JSON格式的数据写到MQS,mqs-topic为在MQS中创建的Topic,后续FDI任务消费此Topic
                        mqsProducer.produce("mqs-topic", jsonString);
                    }
                    LOGGER.info("Success to process {} data", messageList.size());
                    //标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            };
            //注册回调实现类来处理从RocketMQ拉取回来的消息
            rocketMQConsumer.registerMessageListener(rocketmqMessageListener);
            //启动RocketMQ的消费
            rocketMQConsumer.start();
        }
     
        private static DefaultMQPushConsumer createRocketMQConsumer() throws MQClientException {
            //实例化RocketMQ消费者,请填写实际消费组名称
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCompanyGroup");
            //设置NameServer的地址,请填写实际地址
            consumer.setNamesrvAddr("localhost:9876");
            //订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe("RocketMQTopic", "*");
            return consumer;
        }
     
        /**
         * 将RocketMQ读取出来的消息转换成JSON格式的字符串,实际转换需根据RocketMQ的消息内容来实现
         *
         * @param messageExt
         * @return
         */
        private static String convertMessageToJsonString(MessageExt messageExt) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", 1);
            jsonObject.put("name", "zhangsan");
            return jsonObject.toJSONString();
        }
    }
  2. 在根目录下执行以下命令,执行成功后会在RealtimeConnector\target目录下生成可运行jar包,例如RealtimeConnector-1.0-SNAPSHOT.jar。

    # mvn package

  3. 通过Linux或Windows将生成的jar包RealtimeConnector-1.0-SNAPSHOT.jar上传到装有JDK环境的用户服务器上,执行以下命令运行即可。

    # java -jar RealtimeConnector-1.0-SNAPSHOT.jar &

  4. 以MQS数据源作为源端,MySQL作为目标端为例创建实时任务。

    参考创建数据集成任务(普通任务)接入源端MQS数据源和目标端MySQL数据源,并创建实时任务。完成后运行任务,可以将源端MQS数据源中的数据,迁移到MySQL数据源的表中。