数据类型
概述
数据类型是数据的一个基本属性,用于区分不同类别的数据。不同的数据类型所占的存储空间不同,能够进行的操作也不相同。数据库中的数据存储在数据表中。数据表中的每一列都定义了数据类型,用户存储数据时,须遵从这些数据类型的属性,否则可能会出错。
大数据平台的Flink SQL与开源社区相同,支持原生数据类型、复杂数据类型和复杂类型嵌套。
原生数据类型
Flink SQL支持原生数据类型,请参见表1。
数据类型 | 描述 | 存储空间 | 范围 |
|---|---|---|---|
VARCHAR | 可变长度的字符 | - | - |
BOOLEAN | 布尔类型 | - | TRUE/FALSE |
TINYINT | 有符号整数 | 1字节 | -128-127 |
SMALLINT | 有符号整数 | 2字节 | -32768-32767 |
INT | 有符号整数 | 4字节 | -2147483648~2147483647 |
INTEGER | 有符号整数 | 4字节 | -2147483648~2147483647 |
BIGINT | 有符号整数 | 8字节 | -9223372036854775808~9223372036854775807 |
REAL | 单精度浮点型 | 4字节 | - |
FLOAT | 单精度浮点型 | 4字节 | - |
DOUBLE | 双精度浮点型 | 8字节 | - |
DECIMAL | 固定有效位数和小数位数的数据类型 | - | - |
DATE | 日期类型,描述了特定的年月日,以yyyy-MM-dd格式表示,例如2014-05-29 | - | DATE类型不包含时间,所表示日期的范围为0000-01-01 to 9999-12-31 |
TIME | 时间类型,以HH:mm:ss表示。 例如20:17:40 | - | - |
TIMESTAMP(3) | 完整日期,包括日期和时间。 例如:1969-07-20 20:17:40 | - | - |
INTERVAL timeUnit [TO timeUnit] | 时间间隔 例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY | - | - |
复杂数据类型
Flink SQL支持复杂数据类型和复杂类型嵌套。复杂数据类型如表2所示。
数据类型 | 描述 | 声明方式 | 引用方式 | 构造方式 |
|---|---|---|---|---|
ARRAY | 一组有序字段,所有字段的数据类型必须相同。 | ARRAY[TYPE] | 变量名[下标],下标从1开始,例如:v1[1] | Array[value1, value2, ...] as v1 |
MAP | 一组无序的键/值对。键的类型必须是原生数据类型,值的类型可以是原生数据类型或复杂数据类型。同一个MAP键的类型必须相同,值的类型也必须相同。 | MAP[TYPE,TYPE] | 变量名[key],例如:v1[key] | Map[key, value, key2, value2, key3, value3.......] as v1 |
ROW | 一组命名的字段,字段的数据类型可以不同。 | ROW<a1 TYPE1, a2 TYPE2> | 变量名.字段名,例如:v1.a1 | Row('1',2) as v1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | CREATE SOURCE STREAM car_infos ( car_id STRING, address ROW<city STRING, province STRING, country STRING>, average_speed MAP[STRING, LONG], speeds ARRAY[LONG] ) WITH ( type = "dis", region = "xxx", channel = "dliinput", encode = "json" ); CREATE temp STREAM car_speed_infos ( car_id STRING, province STRING, average_speed LONG, start_speed LONG ); INSERT INTO car_speed_infos SELECT car_id, address.province, average_speed[address.city], speeds[1] FROM car_infos; |
复杂类型嵌套
- Json格式增强
- 支持配置Json_schema 配置了json_schema后,可以不声明DDL中的字段,自动从json_schema中生成。使用示例如下:
CREATE SOURCE STREAM data_with_schema WITH ( type = "dis", region = "xxx", channel = "dis-in", encode = "json", json_schema = '{"definitions":{"address":{"type":"object","properties":{"street_address":{"type":"string"},"city":{"type":"string"},"state":{"type":"string"}},"required":["street_address","city","state"]}},"type":"object","properties":{"billing_address":{"$ref":"#/definitions/address"},"shipping_address":{"$ref":"#/definitions/address"},"optional_address":{"oneOf":[{"type":"null"},{"$ref":"#/definitions/address"}]}}}' ); CREATE SINK STREAM buy_infos ( billing_address_city STRING, shipping_address_state string ) WITH ( type = "obs", encode = "csv", region = "xxx" , field_delimiter = ",", row_delimiter = "\n", obs_dir = "bucket/car_infos", file_prefix = "over", rolling_size = "100m" ); insert into buy_infos select billing_address.city, shipping_address.state from data_with_schema;示例数据:
{ "billing_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" }, "shipping_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" } } - 支持不配置json_schema也不配置json_config。json_config使用可以参考开源Kafka输入流样例说明。
这种情况下默认用ddl中属性名当做json key来进行解析。
测试示例数据如下,测试数据既包括嵌套json字段,如billing_address、shipping_address,也包括非嵌套的字段id、type2。
{ "id":"1", "type2":"online", "billing_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" }, "shipping_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" } }具体建表和使用示例参考如下:CREATE SOURCE STREAM car_info_data ( id STRING, type2 STRING, billing_address Row<street_address string, city string, state string>, shipping_address Row<street_address string, city string, state string>, optional_address Row<street_address string, city string, state string> ) WITH ( type = "dis", region = "xxx", channel = "dis-in", encode = "json" ); CREATE SINK STREAM buy_infos ( id STRING, type2 STRING, billing_address_city STRING, shipping_address_state string ) WITH ( type = "obs", encode = "csv", region = "xxx", field_delimiter = ",", row_delimiter = "\n", obs_dir = "bucket/car_infos", file_prefix = "over", rolling_size = "100m" ); insert into buy_infos select id, type2, billing_address.city, shipping_address.state from car_info_data;
- 支持配置Json_schema
- Sink序列化支持复杂类型
- 目前只有CSV、Json两种格式支持复杂类型。
- Json请参考•Json格式增强。
- 由于CSV没有标准的格式,所以目前暂不支持source解析,只支持sink。
- 输出格式:尽量和flink原生保持一致。
Map: {key1=Value1, key2=Value2}
Row: 平摊用逗号分隔属性,如Row(1, '2') => 1,'2'

