JSON Format
功能描述
JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。更多具体使用可参考开源社区文档:JSON Format。
支持的Connector
- Kafka
- Upsert Kafka
- Elasticsearch
参数说明
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
format |
是 |
(none) |
String |
声明使用的格式,这里应为'json'。 |
json.fail-on-missing-field |
否 |
false |
Boolean |
当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。 |
json.ignore-parse-errors |
否 |
false |
Boolean |
当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
json.timestamp-format.standard |
否 |
'SQL' |
String |
声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601':
|
json.map-null-key.mode |
否 |
'FALL' |
String |
指定处理 Map 中 key 值为空的方法。当前支持的值有:'FAIL','DROP'和'LITERAL'。
|
json.map-null-key.literal |
否 |
'null' |
String |
当 'json.map-null-key.mode' 是LITERAL的时候,指定字符串常量替换 Map 中的空 key 值。 |
json.encode.decimal-as-plain-number |
否 |
false |
Boolean |
将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 |
数据类型映射
当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL类型 |
JSON类型 |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
string with format: date-time (with UTC time zone) |
INTERVAL |
number |
ARRAY |
array |
MAP / MULTISET |
object |
ROW |
object |
示例
该示例是从kafka的一个topic中读取数据,并使用kafka sink将数据写入到kafka的另一个topic中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功;否则表示未成功
- 创建flink opensource sql作业,并选择flink版本为1.15,选择保存日志,然后提交并运行,其SQL代码如下:
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from kafkaSource;
- 向作为source的kafka的topic中插入下列数据:
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取.out文件查看结果日志。
+I[202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106] +I[202103241606060001, appShop11, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106]