Help Center> ROMA Connect> Developer Guide> Developer Guide for Data Integration> (Example) Developing a Custom Data Source for a Real-Time Task
Updated on 2023-05-17 GMT+08:00

(Example) Developing a Custom Data Source for a Real-Time Task

Scenarios

Custom connectors can access real-time data sources through message forwarding. This section uses an MQS data source and Java as an example. Refer to RealtimeConnector.rar for demo code.

Prerequisites

  • A Linux server that runs JDK 1.8 or later is available.
  • IntelliJ IDEA: 2018.3.5 or later; Eclipse: 3.6.0 or later
  • Obtain RealtimeConnector.rar from the demo package.
  • The TPS for user programs to write messages to MQS cannot exceed 6000.

Procedure

  1. Create a Spring Boot template project, start real-time data source consumption in the Main method, and use the MQS SDK to produce the consumed data to MQS.

    Sample code:

    @SpringBootApplication
    public class RealtimeConnectorApplication {
        private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConnectorApplication.class);
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer rocketMQConsumer = createRocketMQConsumer();
            MqsProducer mqsProducer = new MqsProducer();
            MessageListenerConcurrently rocketmqMessageListener = new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageList,
                    ConsumeConcurrentlyContext context) {
                    for (MessageExt message : messageList) {
                        String jsonString = convertMessageToJsonString(message);
                        //Write JSON data to MQS. mqs-topic indicates the created topic, which will be consumed by FDI tasks.
                        mqsProducer.produce("mqs-topic", jsonString);
                    }
                    LOGGER.info("Success to process {} data", messageList.size());
                    //Mark the message as consumed.
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            };
            //Register the callback implementation class to process the messages obtained from RocketMQ.
            rocketMQConsumer.registerMessageListener(rocketmqMessageListener);
            //Start RocketMQ consumption.
            rocketMQConsumer.start();
        }
     
        private static DefaultMQPushConsumer createRocketMQConsumer() throws MQClientException {
            //Instantiate the RocketMQ consumer. Enter the actual consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCompanyGroup");
            //Set the NameServer IP address.
            consumer.setNamesrvAddr("localhost:9876");
            //Subscribe to one or more topics and use tags to filter messages to consume.
            consumer.subscribe("RocketMQTopic", "*");
            return consumer;
        }
     
        /**
         * Convert the messages read by RocketMQ into JSON strings. The actual conversion is implemented based on the RocketMQ message content.
         *
         * @param messageExt
         * @return
         */
        private static String convertMessageToJsonString(MessageExt messageExt) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", 1);
            jsonObject.put("name", "zhangsan");
            return jsonObject.toJSONString();
        }
    }
  2. Run the following command in the root directory to generate an executable JAR package, for example, RealtimeConnector-1.0-SNAPSHOT.jar, in RealtimeConnector\target.

    # mvn package

  3. Use Linux or Windows to upload the RealtimeConnector-1.0-SNAPSHOT.jar package to the user server that runs JDK, and run the following command:

    # java -jar RealtimeConnector-1.0-SNAPSHOT.jar &

  4. The following uses an MQS data source as the source and MySQL as the destination to describe how to create a real-time task.

    Connect the MQS data source at the source and the MySQL data source at the destination and create a real-time task. For details, see Creating a Common Data Integration Task. After the configuration is complete, run the task to migrate data from the MQS data source to MySQL tables.