更新时间:2022-11-09 GMT+08:00
Flink Opensource SQL如何解析复杂嵌套 JSON?
- kafka message
{ "id": 1234567890, "name": "swq", "date": "1997-04-25", "obj": { "time1": "12:12:12", "str": "test", "lg": 1122334455 }, "arr": [ "ly", "zpk", "swq", "zjy" ], "rowinarr": [ { "f1": "f11", "f2": 111 }, { "f1": "f12", "f2": 222 } ], "time": "13:13:13", "timestamp": "1997-04-25 14:14:14", "map": { "flink": 123 }, "mapinmap": { "inner_map": { "key": 234 } } }
- flink opensource sql
create table kafkaSource( id BIGINT, name STRING, `date` DATE, obj ROW<time1 TIME,str STRING,lg BIGINT>, arr ARRAY<STRING>, rowinarr ARRAY<ROW<f1 STRING,f2 INT>>, `time` TIME, `timestamp` TIMESTAMP(3), `map` MAP<STRING,BIGINT>, mapinmap MAP<STRING,MAP<STRING,INT>> ) with ( 'connector' = 'kafka', 'topic' = 'topic-swq-3', 'properties.bootstrap.servers' = '10.128.0.138:9092,10.128.0.119:9092,10.128.0.212:9092', 'properties.group.id' = 'swq-test', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table printSink ( id BIGINT, name STRING, `date` DATE, str STRING, arr ARRAY<STRING>, nameinarray STRING, rowinarr ARRAY<ROW<f1 STRING,f2 INT>>, f2 INT, `time` TIME, `timestamp` TIMESTAMP(3), `map` MAP<STRING,BIGINT>, flink BIGINT, mapinmap MAP<STRING,MAP<STRING,INT>>, `key` INT ) with ('connector' = 'print'); insert into printSink select id, name, `date`, obj.str, arr, arr[4], rowinarr, rowinarr[1].f2, `time`, `timestamp`, `map`, `map`['flink'], mapinmap, mapinmap['inner_map']['key'] from kafkaSource;
- result
+I(1234567890,swq,1997-04-25,test,[ly, zpk, swq, zjy],zjy,[f11,111, f12,222],111,13:13:13,1997-04-25T14:14:14,{flink=123},123,{inner_map={key=234}},234)
父主题: Flink SQL作业相关问题
Flink SQL作业相关问题 所有常见问题
- Flink SQL作业的消费能力如何,即一天可以处理多大的数据量?
- Flink SQL中的temp流中数据是否需要定期清理,如何清理?
- 创建Flink SQL作业时选择OBS桶,提示未授权
- Flink SQL作业将OBS表映射为DLI的分区表
- Flink SQL作业Kafka分区数增加或减少,不用停止Flink作业,实现动态感知
- OBS表如何映射为DLI的分区表?
- 在Flink SQL作业中创建表使用EL表达式,作业运行报DLI.0005错误
- Flink作业输出流写入数据到OBS,通过该OBS文件路径创建的DLI表查询无数据
- Flink SQL作业运行失败,日志中有connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null错误
- Flink SQL作业读取DIS数据报Not authorized错误
- Flink SQL作业消费Kafka后sink到es集群,作业执行成功,但未写入数据
- Flink Opensource SQL如何解析复杂嵌套 JSON?
- Flink Opensource SQL从RDS数据库读取的时间和RDS数据库存储的时间为什么会不一致?
- Flink SQL和Flink Opensource SQL的语法有什么区别?
- Flink Opensource SQL Elasticsearch结果表failure-handler参数填写retry_rejected导致提交失败
more