Help Center/
MapReduce Service/
Component Operation Guide (LTS) (Ankara Region)/
Using Flink/
Enhancements to Flink SQL/
Consuming Data in drs-json Format with FlinkSQL Kafka Connector
Updated on 2024-11-29 GMT+08:00
Consuming Data in drs-json Format with FlinkSQL Kafka Connector
Scenarios
FlinkSQL needs to consume data in drs-json format (a CDC message format) in Kafka.
How to Use
In the created Kafka Connector Source stream table, set format to drs-json.
The following is a SQL example:
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'drs-json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name' ); CREATE TABLE printSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'print' ); Insert into printSink select * from KafkaSource;
Parent topic: Enhancements to Flink SQL
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot