数据类型
概述
数据类型是数据的一个基本属性,用于区分不同类别的数据。不同的数据类型所占的存储空间不同,能够进行的操作也不相同。数据库中的数据存储在数据表中。数据表中的每一列都定义了数据类型,用户存储数据时,须遵从这些数据类型的属性,否则可能会出错。
华为大数据平台的Flink SQL与开源社区相同,支持原生数据类型、复杂数据类型和复杂类型嵌套。
原生数据类型
Flink SQL支持原生数据类型,请参见表1。
数据类型 |
描述 |
存储空间 |
范围 |
---|---|---|---|
VARCHAR |
可变长度的字符 |
- |
- |
BOOLEAN |
布尔类型 |
- |
TRUE/FALSE |
TINYINT |
有符号整数 |
1字节 |
-128-127 |
SMALLINT |
有符号整数 |
2字节 |
-32768-32767 |
INT |
有符号整数 |
4字节 |
-2147483648~2147483647 |
INTEGER |
有符号整数 |
4字节 |
-2147483648~2147483647 |
BIGINT |
有符号整数 |
8字节 |
-9223372036854775808~9223372036854775807 |
REAL |
单精度浮点型 |
4字节 |
- |
FLOAT |
单精度浮点型 |
4字节 |
- |
DOUBLE |
双精度浮点型 |
8字节 |
- |
DECIMAL |
固定有效位数和小数位数的数据类型 |
- |
- |
DATE |
日期类型,描述了特定的年月日,以yyyy-MM-dd格式表示,例如2014-05-29 |
- |
DATE类型不包含时间,所表示日期的范围为0000-01-01 to 9999-12-31 |
TIME |
时间类型,以HH:mm:ss表示。 例如20:17:40 |
- |
- |
TIMESTAMP(3) |
完整日期,包括日期和时间。 例如:1969-07-20 20:17:40 |
- |
- |
INTERVAL timeUnit [TO timeUnit] |
时间间隔 例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY |
- |
- |
复杂数据类型
Flink SQL支持复杂数据类型和复杂类型嵌套。复杂数据类型如表2所示。
数据类型 |
描述 |
声明方式 |
引用方式 |
构造方式 |
---|---|---|---|---|
ARRAY |
一组有序字段,所有字段的数据类型必须相同。 |
ARRAY[TYPE] |
变量名[下标],下标从1开始,例如:v1[1] |
Array[value1, value2, ...] as v1 |
MAP |
一组无序的键/值对。键的类型必须是原生数据类型,值的类型可以是原生数据类型或复杂数据类型。同一个MAP键的类型必须相同,值的类型也必须相同。 |
MAP[TYPE,TYPE] |
变量名[key],例如:v1[key] |
Map[key, value, key2, value2, key3, value3.......] as v1 |
ROW |
一组命名的字段,字段的数据类型可以不同。 |
ROW<a1 TYPE1, a2 TYPE2> |
变量名.字段名,例如:v1.a1 |
Row('1',2) as v1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
CREATE SOURCE STREAM car_infos ( car_id STRING, address ROW<city STRING, province STRING, country STRING>, average_speed MAP[STRING, LONG], speeds ARRAY[LONG] ) WITH ( type = "dis", region = "xxx", channel = "dliinput", encode = "json" ); CREATE temp STREAM car_speed_infos ( car_id STRING, province STRING, average_speed LONG, start_speed LONG ); INSERT INTO car_speed_infos SELECT car_id, address.province, average_speed[address.city], speeds[1] FROM car_infos; |
复杂类型嵌套
- Json格式增强
- 支持配置Json_schema
配置了json_schema后,可以不声明DDL中的字段,自动从json_schema中生成。使用示例如下:
CREATE SOURCE STREAM data_with_schema WITH ( type = "dis", region = "xxx", channel = "dis-in", encode = "json", json_schema = '{"definitions":{"address":{"type":"object","properties":{"street_address":{"type":"string"},"city":{"type":"string"},"state":{"type":"string"}},"required":["street_address","city","state"]}},"type":"object","properties":{"billing_address":{"$ref":"#/definitions/address"},"shipping_address":{"$ref":"#/definitions/address"},"optional_address":{"oneOf":[{"type":"null"},{"$ref":"#/definitions/address"}]}}}' ); CREATE SINK STREAM buy_infos ( billing_address_city STRING, shipping_address_state string ) WITH ( type = "obs", encode = "csv", region = "xxx" , field_delimiter = ",", row_delimiter = "\n", obs_dir = "bucket/car_infos", file_prefix = "over", rolling_size = "100m" ); insert into buy_infos select billing_address.city, shipping_address.state from data_with_schema;
示例数据:
{ "billing_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" }, "shipping_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" } }
- 支持不配置json_schema也不配置json_config。json_config使用可以参考开源Kafka输入流样例说明。
这种情况下默认用ddl中属性名当做json key来进行解析。
测试示例数据如下,测试数据既包括嵌套json字段,如billing_address、shipping_address,也包括非嵌套的字段id、type2。
{ "id":"1", "type2":"online", "billing_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" }, "shipping_address": { "street_address":"xxx", "city":"xxx", "state":"xxx" } }
具体建表和使用示例参考如下:CREATE SOURCE STREAM car_info_data ( id STRING, type2 STRING, billing_address Row<street_address string, city string, state string>, shipping_address Row<street_address string, city string, state string>, optional_address Row<street_address string, city string, state string> ) WITH ( type = "dis", region = "xxx", channel = "dis-in", encode = "json" ); CREATE SINK STREAM buy_infos ( id STRING, type2 STRING, billing_address_city STRING, shipping_address_state string ) WITH ( type = "obs", encode = "csv", region = "xxx", field_delimiter = ",", row_delimiter = "\n", obs_dir = "bucket/car_infos", file_prefix = "over", rolling_size = "100m" ); insert into buy_infos select id, type2, billing_address.city, shipping_address.state from car_info_data;
- 支持配置Json_schema
- Sink序列化支持复杂类型
- 目前只有CSV、Json两种格式支持复杂类型。
- Json请参考•Json格式增强。
- 由于CSV没有标准的格式,所以目前暂不支持source解析,只支持sink。
- 输出格式:尽量和flink原生保持一致。
Map: {key1=Value1, key2=Value2}
Row: 平摊用逗号分隔属性,如Row(1, '2') => 1,'2'