文档首页/ 数据湖探索 DLI/ 常见问题/ Flink作业类/ Flink SQL作业类/ Flink Opensource SQL如何解析复杂嵌套 JSON?
更新时间:2024-11-08 GMT+08:00

Flink Opensource SQL如何解析复杂嵌套 JSON?

  • kafka message
    {
      "id": 1234567890,
      "name": "swq",
      "date": "1997-04-25",
      "obj": {
        "time1": "12:12:12",
        "str": "test",
        "lg": 1122334455
      },
      "arr": [
        "ly",
        "zpk",
        "swq",
        "zjy"
      ],
      "rowinarr": [
        {
          "f1": "f11",
          "f2": 111
        },
        {
          "f1": "f12",
          "f2": 222
        }
      ],
      "time": "13:13:13",
      "timestamp": "1997-04-25 14:14:14",
      "map": {
        "flink": 123
      },
      "mapinmap": {
        "inner_map": {
          "key": 234
        }
      }
    }
  • flink opensource sql
    create table kafkaSource(
      id            BIGINT,
      name          STRING,
      `date`        DATE,
      obj           ROW<time1 TIME,str STRING,lg BIGINT>,
      arr           ARRAY<STRING>,
      rowinarr      ARRAY<ROW<f1 STRING,f2 INT>>,
      `time`        TIME,
      `timestamp`   TIMESTAMP(3),
      `map`         MAP<STRING,BIGINT>,
      mapinmap      MAP<STRING,MAP<STRING,INT>>
    ) with (
      'connector' = 'kafka',
      'topic' = 'topic-swq-3',
      'properties.bootstrap.servers' = '10.128.0.138:9092,10.128.0.119:9092,10.128.0.212:9092',
      'properties.group.id' = 'swq-test',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    create table printSink (
      id            BIGINT,
      name          STRING,
      `date`        DATE,
      str           STRING,
      arr           ARRAY<STRING>,
      nameinarray   STRING,
      rowinarr      ARRAY<ROW<f1 STRING,f2 INT>>,
      f2            INT,
      `time`        TIME,
      `timestamp`   TIMESTAMP(3),
      `map`         MAP<STRING,BIGINT>,
      flink         BIGINT,
      mapinmap      MAP<STRING,MAP<STRING,INT>>,
      `key`         INT
    ) with ('connector' = 'print');
     
    insert into
      printSink
    select
      id,
      name,
      `date`,
      obj.str,
      arr,
      arr[4],
      rowinarr,
      rowinarr[1].f2,
      `time`,
      `timestamp`,
      `map`,
      `map`['flink'],
      mapinmap,
      mapinmap['inner_map']['key']
    from kafkaSource;
  • result
    +I(1234567890,swq,1997-04-25,test,[ly, zpk, swq, zjy],zjy,[f11,111, f12,222],111,13:13:13,1997-04-25T14:14:14,{flink=123},123,{inner_map={key=234}},234)
  1. 各数据类型获取元素的方法:

    - map:map['key']

    - array:array[index]

    - row:row.key

  2. array 的起始下标从 1 开始,即 array[1] 是 array 的第一个元素。
  3. array 的元素必须同类型,row 的元素可以不同类型。