更新时间:2024-04-19 GMT+08:00

JSON Format

功能描述

JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。更多具体使用可参考开源社区文档:JSON Format

支持的Connector

  • Kafka
  • Upsert Kafka
  • Elasticsearch

参数说明

表1

参数

是否必选

默认值

类型

说明

format

(none)

String

声明使用的格式,这里应为'json'。

json.fail-on-missing-field

false

Boolean

当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。

json.ignore-parse-errors

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

json.timestamp-format.standard

'SQL'

String

声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601':
  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。

json.map-null-key.mode

'FALL'

String

指定处理 Map 中 key 值为空的方法。当前支持的值有:'FAIL','DROP'和'LITERAL'。

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。

json.map-null-key.literal

'null'

String

当 'json.map-null-key.mode' 是LITERAL的时候,指定字符串常量替换 Map 中的空 key 值。

json.encode.decimal-as-plain-number

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。

数据类型映射

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。

在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。

下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

表2 数据类型映射

Flink SQL类型

JSON类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object

示例

该示例是从kafka的一个topic中读取数据,并使用kafka sink将数据写入到kafka的另一个topic中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功;否则表示未成功
  2. 创建flink opensource sql作业,并选择flink版本为1.15,选择保存日志,然后提交并运行,其SQL代码如下:

    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE printSink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from kafkaSource;

  3. 向作为source的kafka的topic中插入下列数据:

    {"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"}
    
    {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}

  4. 按照如下方式查看taskmanager.out文件中的数据结果:

    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
      +I[202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106]
      +I[202103241606060001, appShop11, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106]