更新时间:2024-11-06 GMT+08:00
Flink Opensource SQL如何解析复杂嵌套 JSON?
- kafka message
{ "id": 1234567890, "name": "swq", "date": "1997-04-25", "obj": { "time1": "12:12:12", "str": "test", "lg": 1122334455 }, "arr": [ "ly", "zpk", "swq", "zjy" ], "rowinarr": [ { "f1": "f11", "f2": 111 }, { "f1": "f12", "f2": 222 } ], "time": "13:13:13", "timestamp": "1997-04-25 14:14:14", "map": { "flink": 123 }, "mapinmap": { "inner_map": { "key": 234 } } }
- flink opensource sql
create table kafkaSource( id BIGINT, name STRING, `date` DATE, obj ROW<time1 TIME,str STRING,lg BIGINT>, arr ARRAY<STRING>, rowinarr ARRAY<ROW<f1 STRING,f2 INT>>, `time` TIME, `timestamp` TIMESTAMP(3), `map` MAP<STRING,BIGINT>, mapinmap MAP<STRING,MAP<STRING,INT>> ) with ( 'connector' = 'kafka', 'topic' = 'topic-swq-3', 'properties.bootstrap.servers' = '10.128.0.138:9092,10.128.0.119:9092,10.128.0.212:9092', 'properties.group.id' = 'swq-test', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table printSink ( id BIGINT, name STRING, `date` DATE, str STRING, arr ARRAY<STRING>, nameinarray STRING, rowinarr ARRAY<ROW<f1 STRING,f2 INT>>, f2 INT, `time` TIME, `timestamp` TIMESTAMP(3), `map` MAP<STRING,BIGINT>, flink BIGINT, mapinmap MAP<STRING,MAP<STRING,INT>>, `key` INT ) with ('connector' = 'print'); insert into printSink select id, name, `date`, obj.str, arr, arr[4], rowinarr, rowinarr[1].f2, `time`, `timestamp`, `map`, `map`['flink'], mapinmap, mapinmap['inner_map']['key'] from kafkaSource;
- result
+I(1234567890,swq,1997-04-25,test,[ly, zpk, swq, zjy],zjy,[f11,111, f12,222],111,13:13:13,1997-04-25T14:14:14,{flink=123},123,{inner_map={key=234}},234)
父主题: Flink SQL作业类