Parquet Format
功能描述
Apache Parquet格式允许读写 Parquet 数据。更多具体使用可参考开源社区文档:Parquet Format。
支持的Connector
- FileSystem
 
参数说明
| 
        参数  | 
      
        是否必选  | 
      
        默认值  | 
      
        类型  | 
      
        描述  | 
     
|---|---|---|---|---|
| 
        format  | 
      
        是  | 
      
        无  | 
      
        String  | 
      
        指定使用的格式,此处应为"parquet"。  | 
     
| 
        parquet.utc-timezone  | 
      
        否  | 
      
        false  | 
      
        Boolean  | 
      
        使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 Hive 3.x 使用 UTC 时区。  | 
     
数据类型映射
目前,Parquet 格式类型映射与 Apache Hive 兼容,但与 Apache Spark 有所不同:
- Timestamp:不论精度,映射 timestamp 类型至 int96。
 - Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。
 
下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
注意:复合数据类型暂只支持写不支持读(Array、Map 与 Row)。
| 
        Flink数据类型  | 
      
        Parquet类型  | 
      
        Parquet逻辑类型  | 
     
|---|---|---|
| 
        CHAR / VARCHAR / STRING  | 
      
        BINARY  | 
      
        UTF8  | 
     
| 
        BOOLEAN  | 
      
        BOOLEAN  | 
      
        -  | 
     
| 
        BINARY / VARBINARY  | 
      
        BINARY  | 
      
        -  | 
     
| 
        DECIMAL  | 
      
        FIXED_LEN_BYTE_ARRAY  | 
      
        DECIMAL  | 
     
| 
        TINYINT  | 
      
        INT32  | 
      
        INT_8  | 
     
| 
        SMALLINT  | 
      
        INT32  | 
      
        INT_16  | 
     
| 
        INT  | 
      
        INT32  | 
      
        -  | 
     
| 
        BIGINT  | 
      
        INT64  | 
      
        -  | 
     
| 
        FLOAT  | 
      
        FLOAT  | 
      
        -  | 
     
| 
        DOUBLE  | 
      
        DOUBLE  | 
      
        -  | 
     
| 
        DATE  | 
      
        INT32  | 
      
        DATE  | 
     
| 
        TIME  | 
      
        INT32  | 
      
        TIME_MILLIS  | 
     
| 
        TIMESTAMP  | 
      
        INT96  | 
      
        -  | 
     
| 
        ARRAY  | 
      
        -  | 
      
        LIST  | 
     
| 
        MAP  | 
      
        -  | 
      
        MAP  | 
     
| 
        ROW  | 
      
        -  | 
      
        STRUCT  | 
     
示例
使用kafka发送数据,输出到print中。
- 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列> 找到作业的所属队列> 更多> 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
 - 创建flink opensource sql作业,开启checkpoint,并提交运行,其代码如下:
    
    
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic-pattern' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE sink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'path' = 'obs://xx' ); insert into sink select * from kafkaSource;
 - 向kafka的作为source的topic中插入下列数据:
    
    
202103251505050001,appShop,2021-03-25 15:05:05,500.00,400.00,2021-03-25 15:10:00,0003,Cindy,330108 202103241606060001,appShop,2021-03-24 16:06:06,200.00,180.00,2021-03-24 16:10:06,0001,Alice,330106
 - 读取sink表中配置的obs路径中的parquet文件,其数据结果如下
    
    
202103251202020001, miniAppShop, 2021-03-25 12:02:02, 60.0, 60.0, 2021-03-25 12:03:00, 0002, Bob, 330110 202103241606060001, appShop, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106