Help Center>
Data Lake Insight>
FAQs>
Flink Jobs>
Flink SQL>
How Does Flink Opensource SQL Parse Nested JSON?
Updated on 2023-03-21 GMT+08:00
How Does Flink Opensource SQL Parse Nested JSON?
- Kafka message
{ "id": 1234567890, "name": "swq", "date": "1997-04-25", "obj": { "time1": "12:12:12", "str": "test", "lg": 1122334455 }, "arr": [ "ly", "zpk", "swq", "zjy" ], "rowinarr": [ { "f1": "f11", "f2": 111 }, { "f1": "f12", "f2": 222 } ], "time": "13:13:13", "timestamp": "1997-04-25 14:14:14", "map": { "flink": 123 }, "mapinmap": { "inner_map": { "key": 234 } } }
- Flink Opensource SQL
create table kafkaSource( id BIGINT, name STRING, `date` DATE, obj ROW<time1 TIME,str STRING,lg BIGINT>, arr ARRAY<STRING>, rowinarr ARRAY<ROW<f1 STRING,f2 INT>>, `time` TIME, `timestamp` TIMESTAMP(3), `map` MAP<STRING,BIGINT>, mapinmap MAP<STRING,MAP<STRING,INT>> ) with ( 'connector' = 'kafka', 'topic' = 'topic-swq-3', 'properties.bootstrap.servers' = '10.128.0.138:9092,10.128.0.119:9092,10.128.0.212:9092', 'properties.group.id' = 'swq-test', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table printSink ( id BIGINT, name STRING, `date` DATE, str STRING, arr ARRAY<STRING>, nameinarray STRING, rowinarr ARRAY<ROW<f1 STRING,f2 INT>>, f2 INT, `time` TIME, `timestamp` TIMESTAMP(3), `map` MAP<STRING,BIGINT>, flink BIGINT, mapinmap MAP<STRING,MAP<STRING,INT>>, `key` INT ) with ('connector' = 'print'); insert into printSink select id, name, `date`, obj.str, arr, arr[4], rowinarr, rowinarr[1].f2, `time`, `timestamp`, `map`, `map`['flink'], mapinmap, mapinmap['inner_map']['key'] from kafkaSource;
- Result
+I(1234567890,swq,1997-04-25,test,[ly, zpk, swq, zjy],zjy,[f11,111, f12,222],111,13:13:13,1997-04-25T14:14:14,{flink=123},123,{inner_map={key=234}},234)
- Use the following methods to obtain elements in the containers of different types.
- array: array[index]
- row: row.key
- The index of an array starts from 1. Array[1] is the first element.
- The elements of an array must be of the same type, and the elements of a row can be of different types.
Parent topic: Flink SQL
Flink SQL FAQs
- How Much Data Can Be Processed in a Day by a Flink SQL Job?
- Does Data in the Temporary Stream of Flink SQL Need to Be Cleared Periodically? How Do I Clear the Data?
- Why Is a Message Displayed Indicating That the OBS Bucket Is Not Authorized When I Select an OBS Bucket for a Flink SQL Job?
- How Do I Create an OBS Partitioned Table for a Flink SQL Job?
- How Do I Change the Number of Kafka Partitions of a Flink SQL Job Without Stopping It?
- How Do I Dump Data to OBS and Create an OBS Partitioned Table?
- Why Is Error Message "DLI.0005" Displayed When I Use an EL Expression to Create a Table in a Flink SQL Job?
- Why Is No Data Queried in the DLI Table Created Using the OBS File Path When Data Is Written to OBS by a Flink Job Output Stream?
- Why Does a Flink SQL Job Fails to Be Executed, and Is "connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null" Displayed in the Log?
- Why Is Error "Not authorized" Reported When a Flink SQL Job Reads DIS Data?
- Data Writing Fails After a Flink SQL Job Consumed Kafka and Sank Data to the Elasticsearch Cluster
- How Does Flink Opensource SQL Parse Nested JSON?
- Why Is the RDS Database Time Read by a Flink Opensource SQL Job Different from RDS Database Time?
- What Are the Syntax Differences Between Flink SQL and Flink Opensource SQL?
- Why Does Job Submission Fail When the failure-handler Parameter of the Elasticsearch Result Table for a Flink Opensource SQL Job Is Set to retry_rejected?
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
The system is busy. Please try again later.
more