更新时间:2022-02-22 GMT+08:00

数据类型

概述

数据类型是数据的一个基本属性,用于区分不同类别的数据。不同的数据类型所占的存储空间不同,能够进行的操作也不相同。数据库中的数据存储在数据表中。数据表中的每一列都定义了数据类型,用户存储数据时,须遵从这些数据类型的属性,否则可能会出错。

大数据平台的Flink SQL与开源社区相同,支持原生数据类型、复杂数据类型和复杂类型嵌套。

原生数据类型

Flink SQL支持原生数据类型,请参见表1

表1 原生数据类型

数据类型

描述

存储空间

范围

VARCHAR

可变长度的字符

-

-

BOOLEAN

布尔类型

-

TRUE/FALSE

TINYINT

有符号整数

1字节

-128-127

SMALLINT

有符号整数

2字节

-32768-32767

INT

有符号整数

4字节

-2147483648~2147483647

INTEGER

有符号整数

4字节

-2147483648~2147483647

BIGINT

有符号整数

8字节

-9223372036854775808~9223372036854775807

REAL

单精度浮点型

4字节

-

FLOAT

单精度浮点型

4字节

-

DOUBLE

双精度浮点型

8字节

-

DECIMAL

固定有效位数和小数位数的数据类型

-

-

DATE

日期类型,描述了特定的年月日,以yyyy-MM-dd格式表示,例如2014-05-29

-

DATE类型不包含时间,所表示日期的范围为0000-01-01 to 9999-12-31

TIME

时间类型,以HH:mm:ss表示。

例如20:17:40

-

-

TIMESTAMP(3)

完整日期,包括日期和时间。

例如:1969-07-20 20:17:40

-

-

INTERVAL timeUnit [TO timeUnit]

时间间隔

例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY

-

-

复杂数据类型

Flink SQL支持复杂数据类型和复杂类型嵌套。复杂数据类型如表2所示。

表2 复杂数据类型

数据类型

描述

声明方式

引用方式

构造方式

ARRAY

一组有序字段,所有字段的数据类型必须相同。

ARRAY[TYPE]

变量名[下标],下标从1开始,例如:v1[1]

Array[value1, value2, ...] as v1

MAP

一组无序的键/值对。键的类型必须是原生数据类型,值的类型可以是原生数据类型或复杂数据类型。同一个MAP键的类型必须相同,值的类型也必须相同。

MAP[TYPE,TYPE]

变量名[key],例如:v1[key]

Map[key, value, key2, value2, key3, value3.......] as v1

ROW

一组命名的字段,字段的数据类型可以不同。

ROW<a1 TYPE1, a2 TYPE2>

变量名.字段名,例如:v1.a1

Row('1',2) as v1

使用示例如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CREATE SOURCE STREAM car_infos (
  car_id STRING,
  address ROW<city STRING, province STRING, country STRING>,
  average_speed MAP[STRING, LONG],
  speeds ARRAY[LONG]
) 
  WITH (
    type = "dis",
    region =  "xxxx",
    channel = "dliinput",
    encode = "json"
);

CREATE SINK STREAM car_speed_infos (
  car_id STRING,
  province STRING,
  average_speed LONG,
  start_speed LONG
);

INSERT INTO car_speed_infos SELECT
   car_id,
   address.province,
   average_speed[address.city],
   speeds[1]
FROM car_infos;

复杂类型嵌套

  • Json格式增强

    以Source为例进行说明,Sink的使用方法相同。

    • 支持配置Json_schema
      配置了json_schema后,可以不声明DDL中的字段,自动从json_schema中生成。使用示例如下:
      CREATE SOURCE STREAM data_with_schema WITH (
             type = "dis",
             region =  "xxxx",
             channel = "dis-in",
             encode = "json",
             json_schema = '{"definitions":{"address":{"type":"object","properties":{"street_address":{"type":"string"},"city":{"type":"string"},"state":{"type":"string"}},"required":["street_address","city","state"]}},"type":"object","properties":{"billing_address":{"$ref":"#/definitions/address"},"shipping_address":{"$ref":"#/definitions/address"},"optional_address":{"oneOf":[{"type":"null"},{"$ref":"#/definitions/address"}]}}}'
           );
      
           CREATE SINK STREAM buy_infos (
             billing_address_city STRING,
             shipping_address_state string
           ) WITH (
             type = "obs",
             encode = "csv",
             region =  "xxxx" ,
             field_delimiter = ",",
             row_delimiter = "\n",
             obs_dir = "bucket/car_infos",
             file_prefix = "over",
             rolling_size = "100m"
           );
      
           insert into buy_infos select billing_address.city, shipping_address.state from data_with_schema;

      示例数据:

      {
       "billing_address":
        {
         "street_address":"binjiang",
         "city":"hangzhou",
         "state":"zhejiang"
         },
       "shipping_address":
        {
         "street_address":"tianhe",
         "city":"guangzhou",
         "state":"guangdong"
        }
      }
    • 支持不配置json_schema也不配置json_config
      这种情况下默认用ddl中属性名当做json key来进行解析。使用示例如下:
      CREATE SOURCE STREAM car_info_data (
             billing_address Row<street_address string, city string, state string>,
             shipping_address Row<street_address string, city string, state string>,
             optional_address Row<street_address string, city string, state string>
           ) WITH (
             type = "dis",
             region =  "xxxx",
             channel = "dis-in",
             encode = "json"
      	 );
      	
      	 CREATE SINK STREAM buy_infos (
             billing_address_city STRING,
             shipping_address_state string
           ) WITH (
             type = "obs",
             encode = "csv",
             region =  "xxxx" ,
             field_delimiter = ",",
             row_delimiter = "\n",
             obs_dir = "bucket/car_infos",
             file_prefix = "over",
             rolling_size = "100m"
           );
      
           insert into buy_infos select billing_address.city, shipping_address.state from car_info_data;   
  • Sink序列化支持复杂类型
    • 目前只有CSV、Json两种格式支持复杂类型。
    • ​ Json请参考•Json格式增强
    • ​ 由于CSV没有标准的格式,所以目前暂不支持source解析,只支持sink。
    • ​ 输出格式:尽量和flink原生保持一致。

      ​ Map: {key1=Value1, key2=Value2}

      ​ Row: 平摊用逗号分隔属性,如Row(1, '2') => 1,'2'