Help Center/ Data Lake Insight/ FAQs/ Flink Jobs/ Flink SQL/ How Does Flink Opensource SQL Parse Nested JSON?
Updated on 2023-03-21 GMT+08:00

How Does Flink Opensource SQL Parse Nested JSON?

  • Kafka message
    {
      "id": 1234567890,
      "name": "swq",
      "date": "1997-04-25",
      "obj": {
        "time1": "12:12:12",
        "str": "test",
        "lg": 1122334455
      },
      "arr": [
        "ly",
        "zpk",
        "swq",
        "zjy"
      ],
      "rowinarr": [
        {
          "f1": "f11",
          "f2": 111
        },
        {
          "f1": "f12",
          "f2": 222
        }
      ],
      "time": "13:13:13",
      "timestamp": "1997-04-25 14:14:14",
      "map": {
        "flink": 123
      },
      "mapinmap": {
        "inner_map": {
          "key": 234
        }
      }
    }
  • Flink Opensource SQL
    create table kafkaSource(
      id            BIGINT,
      name          STRING,
      `date`        DATE,
      obj           ROW<time1 TIME,str STRING,lg BIGINT>,
      arr           ARRAY<STRING>,
      rowinarr      ARRAY<ROW<f1 STRING,f2 INT>>,
      `time`        TIME,
      `timestamp`   TIMESTAMP(3),
      `map`         MAP<STRING,BIGINT>,
      mapinmap      MAP<STRING,MAP<STRING,INT>>
    ) with (
      'connector' = 'kafka',
      'topic' = 'topic-swq-3',
      'properties.bootstrap.servers' = '10.128.0.138:9092,10.128.0.119:9092,10.128.0.212:9092',
      'properties.group.id' = 'swq-test',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    create table printSink (
      id            BIGINT,
      name          STRING,
      `date`        DATE,
      str           STRING,
      arr           ARRAY<STRING>,
      nameinarray   STRING,
      rowinarr      ARRAY<ROW<f1 STRING,f2 INT>>,
      f2            INT,
      `time`        TIME,
      `timestamp`   TIMESTAMP(3),
      `map`         MAP<STRING,BIGINT>,
      flink         BIGINT,
      mapinmap      MAP<STRING,MAP<STRING,INT>>,
      `key`         INT
    ) with ('connector' = 'print');
     
    insert into
      printSink
    select
      id,
      name,
      `date`,
      obj.str,
      arr,
      arr[4],
      rowinarr,
      rowinarr[1].f2,
      `time`,
      `timestamp`,
      `map`,
      `map`['flink'],
      mapinmap,
      mapinmap['inner_map']['key']
    from kafkaSource;
  • Result
    +I(1234567890,swq,1997-04-25,test,[ly, zpk, swq, zjy],zjy,[f11,111, f12,222],111,13:13:13,1997-04-25T14:14:14,{flink=123},123,{inner_map={key=234}},234)
  1. Use the following methods to obtain elements in the containers of different types.

    - map: map['key']

    - array: array[index]

    - row: row.key

  2. The index of an array starts from 1. Array[1] is the first element.
  3. The elements of an array must be of the same type, and the elements of a row can be of different types.