更新时间:2024-02-07 GMT+08:00

FileSystem结果表

功能描述

FileSystem sink用于将数据输出到分布式文件系统HDFS或者对象存储服务OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。

考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。

桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。

  • 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。
  • sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE sink_table (
   name string,
   num INT,
   p_day string,
   p_hour string
) partitioned by (p_day, p_hour) WITH (
   'connector' = 'filesystem',
   'path' = 'obs://*** ',
   'format' = 'parquet',
   'auto-compaction' = 'true'
);

使用说明

  • 滚动策略

    RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后在转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。

    在 STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为 Finished 状态),共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。详见滚动策略相关参数说明

  • Part文件生命周期

    为了在下游使用 FileSink 作为输出,需要了解生成的输出文件的命名和生命周期。

    Part 文件可以处于以下三种状态中的任意一种:

    • In-progress当前正在写入的 Part 文件处于 in-progress 状态
    • Pending由于指定的滚动策略)关闭 in-progress 状态文件,并且等待提交
    • Finished流模式(STREAMING)下的成功的 Checkpoint 或者批模式(BATCH)下输入结束,文件的Pending状态转换为 Finished 状态

    只有 Finished 状态下的文件才能被下游安全读取,并且保证不会被修改。

    默认的,Part文件命名策略如下:

    • In-progress / Pending:part-<uid>-<partFileIndex>.inprogress.uid
    • Finished:part-<uid>-<partFileIndex>

    当Sink Subtask实例化时,uid是一个分配给 Subtask 的随机ID值。uid不具有容错机制,所以当Subtask从故障恢复时,uid会重新生成。

  • 文件合并

    FileSink 开始支持已经提交Pending文件的合并,从而允许应用设置一个较小的时间周期并且避免生成大量的小文件。

    这一功能开启后,在文件转为Pending状态与文件最终提交之间会进行文件合并。这些Pending状态的文件将首先被提交为一个以.开头的临时文件。这些临时文件随后将会按照用户指定的策略和合并方式进行合并,最终生成合并后的Pending状态的文件。 然后这些文件将被发送给Committer并提交为正式文件,在这之后,原始的临时文件也会被删除掉。

  • 分区功能

    Filesystem sink支持分区功能,通过partitioned by语法根据选择的字段进行分区。示例如下:

    path
    └── datetime=2022-06-25
        └── hour=10
            ├── part-0.parquet
            ├── part-1.parquet
    └── datetime=2022-06-26
        └── hour=16
            ├── part-0.parquet
        └── hour=17
            ├── part-0.parquet

    分区和文件一样,也需要进行提交,通知下游应用可以安全地读取分区内的文件。Filesystem sink提供多种提交配置策略。

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

connector

String

固定位filesystem。

path

String

OBS路径。

format

String

文件格式。

支持csv、parquet格式。

sink.rolling-policy.file-size

128MB

MemorySize

单个part文件最大大小,超过该数值会滚动产生新文件。

说明:

RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后在转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为Finished状态)共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。

sink.rolling-policy.rollover-interval

30 min

Duration

单个Part文件处于打开状态的最长时间,超过该时间会滚动产生新文件(默认值30分钟,以避免产生大量小文件)。检查频率是通过sink.rolling-policy.check-interval参数控制的。

说明:

该参数数字与单位之间必须要有空格。

支持的时间单位包括: d,h,min,s,ms等。

对于bulk格式的文件(parquet、orc、avro),checkpoint的时间间隔也会控制单个part文件打开的最长时间。

sink.rolling-policy.check-interval

1 min

Duration

基于时间的滚动策略的检查间隔。

该属性控制了基于sink.rolling-policy.rollover-interval属性检查文件是否该被滚动的检查频率。

auto-compaction

false

Boolean

在流式 sink 中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该checkpoint产生的临时文件会被合并。

compaction.file-size

`sink.rolling-policy.file-size`的大小

MemorySize

合并目标文件大小,默认值为滚动文件大小。

说明:
  • 只有在同个checkpoint内的文件会被合并,因此最终文件的数量至少等于checkpoint的数量。
  • 如果合并时间较长,可能会引起反压,延长checkpoint所需时间。
  • 开启该功能后,checkpoint时会产生最终文件,并打开新的文件接收下个checkpoint产生的数据。

示例一

使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint无关,达到30min或128MB时,生成新文件。
create table orders(
  name string,
  num INT
) with (
  'connector' = 'datagen',
  'rows-per-second' = '100', 
  'fields.name.kind' = 'random', 
  'fields.name.length' = '5' 
);

CREATE TABLE sink_table (
   name string,
   num INT
) WITH (
   'connector' = 'filesystem',
   'path' = 'obs://bucketName/fileName',
   'format' = 'csv',
   'sink.rolling-policy.file-size'='128m',
   'sink.rolling-policy.rollover-interval'='30 min'
);
INSERT into sink_table SELECT * from orders;

示例二

使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到checkpoint间隔或达到100MB时,生成新文件。
create table orders(
  name string,
  num INT
) with (
  'connector' = 'datagen',
  'rows-per-second' = '100', 
  'fields.name.kind' = 'random', 
  'fields.name.length' = '5' 
);

CREATE TABLE sink_table (
   name string,
   num INT
) WITH (
   'connector' = 'filesystem',
   'path' = 'obs://bucketName/fileName',
   'format' = 'csv',
   'sink.rolling-policy.file-size'='128m',
   'sink.rolling-policy.rollover-interval'='30 min',
   'auto-compaction'='true',
   'compaction.file-size'='100m'

);
INSERT into sink_table SELECT * from orders;