更新时间:2024-04-19 GMT+08:00

Parquet Format

功能描述

Apache Parquet格式允许读写 Parquet 数据。更多具体使用可参考开源社区文档:Parquet Format

支持的Connector

  • FileSystem

参数说明

表1 参数说明

参数

是否必选

默认值

类型

描述

format

String

指定使用的格式,此处应为"parquet"。

parquet.utc-timezone

false

Boolean

使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 Hive 3.x 使用 UTC 时区。

数据类型映射

目前,Parquet 格式类型映射与 Apache Hive 兼容,但与 Apache Spark 有所不同:

  • Timestamp:不论精度,映射 timestamp 类型至 int96。
  • Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。

下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

注意:复合数据类型暂只支持写不支持读(Array、Map 与 Row)。

表2 数据类型映射

Flink数据类型

Parquet类型

Parquet逻辑类型

CHAR / VARCHAR / STRING

BINARY

UTF8

BOOLEAN

BOOLEAN

-

BINARY / VARBINARY

BINARY

-

DECIMAL

FIXED_LEN_BYTE_ARRAY

DECIMAL

TINYINT

INT32

INT_8

SMALLINT

INT32

INT_16

INT

INT32

-

BIGINT

INT64

-

FLOAT

FLOAT

-

DOUBLE

DOUBLE

-

DATE

INT32

DATE

TIME

INT32

TIME_MILLIS

TIMESTAMP

INT96

-

ARRAY

-

LIST

MAP

-

MAP

ROW

-

STRUCT

示例

使用kafka发送数据,输出到print中。

  1. 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列连通性(通用队列> 找到作业的所属队列> 更多> 测试地址连通性 > 输入kafka的地址 > 测试)。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
  2. 创建flink opensource sql作业,开启checkpoint,并提交运行,其代码如下:

    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic-pattern' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    
    CREATE TABLE sink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'filesystem',
      'format' = 'parquet',
      'path' = 'obs://xx'
    );
    insert into sink select * from kafkaSource; 

  3. 向kafka的作为source的topic中插入下列数据:

    202103251505050001,appShop,2021-03-25 15:05:05,500.00,400.00,2021-03-25 15:10:00,0003,Cindy,330108
    
    202103241606060001,appShop,2021-03-24 16:06:06,200.00,180.00,2021-03-24 16:10:06,0001,Alice,330106

  4. 读取sink表中配置的obs路径中的parquet文件,其数据结果如下

    202103251202020001, miniAppShop, 2021-03-25 12:02:02, 60.0, 60.0, 2021-03-25 12:03:00, 0002, Bob, 330110
    
    202103241606060001, appShop, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106