(Example) Developing a Custom Data Source for a Real-Time Task
Scenarios
Custom connectors can access real-time data sources through message forwarding. This section uses an MQS data source and Java as an example. Refer to RealtimeConnector.rar for demo code.
Prerequisites
- A Linux server that runs JDK 1.8 or later is available.
- IntelliJ IDEA: 2018.3.5 or later; Eclipse: 3.6.0 or later
- Obtain MysqlConnctor.rar from the demo (sha256:34c9bc8d99eba4ed193603019ce2b69afa3ed760a452231ece3c89fd7dd74da1) package.
- The TPS for user programs to write messages to MQS cannot exceed 6000.
Procedure
- Create a Spring Boot template project, start real-time data source consumption in the Main method, and use the MQS SDK to produce the consumed data to MQS.
Sample code:
@SpringBootApplication public class RealtimeConnectorApplication { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConnectorApplication.class); public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer rocketMQConsumer = createRocketMQConsumer(); MqsProducer mqsProducer = new MqsProducer(); MessageListenerConcurrently rocketmqMessageListener = new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageList, ConsumeConcurrentlyContext context) { for (MessageExt message : messageList) { String jsonString = convertMessageToJsonString(message); //Write JSON data to MQS. mqs-topic indicates the created topic, which will be consumed by FDI tasks. mqsProducer.produce("mqs-topic", jsonString); } LOGGER.info("Success to process {} data", messageList.size()); //Mark the message as consumed. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }; //Register the callback implementation class to process the messages obtained from RocketMQ. rocketMQConsumer.registerMessageListener(rocketmqMessageListener); //Start RocketMQ consumption. rocketMQConsumer.start(); } private static DefaultMQPushConsumer createRocketMQConsumer() throws MQClientException { //Instantiate the RocketMQ consumer. Enter the actual consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCompanyGroup"); //Set the NameServer IP address. consumer.setNamesrvAddr("localhost:9876"); //Subscribe to one or more topics and use tags to filter messages to consume. consumer.subscribe("RocketMQTopic", "*"); return consumer; } /** * Convert the messages read by RocketMQ into JSON strings. The actual conversion is implemented based on the RocketMQ message content. * * @param messageExt * @return */ private static String convertMessageToJsonString(MessageExt messageExt) { JSONObject jsonObject = new JSONObject(); jsonObject.put("id", 1); jsonObject.put("name", "zhangsan"); return jsonObject.toJSONString(); } }
- Run the following command in the root directory to generate an executable JAR package, for example, RealtimeConnector-1.0-SNAPSHOT.jar, in RealtimeConnector\target.
# mvn package
- Use Linux or Windows to upload the RealtimeConnector-1.0-SNAPSHOT.jar package to the user server that runs JDK, and run the following command:
# java -jar RealtimeConnector-1.0-SNAPSHOT.jar &
- The following uses an MQS data source as the source and MySQL as the destination to describe how to create a real-time task.
Connect the MQS data source at the source and the MySQL data source at the destination and create a real-time task. For details, see Creating a Common Data Integration Task. After the configuration is complete, run the task to migrate data from the MQS data source to MySQL tables.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot