更新时间:2024-02-07 GMT+08:00

FileSytem结果表

功能描述

FileSystem结果表用于将数据输出到分布式文件系统HDFS或者对象存储服务OBS等文件系统。数据生成后,可直接对生成的目录创建非DLI表,通过DLI SQL进行下一步处理分析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。

语法格式

1
2
3
4
5
6
7
create table filesystemSink (
  attr_name attr_type (',' attr_name attr_type) *
) with (
  'connector.type' = 'filesystem',
  'connector.file-path' = '',
  'format.type' = ''
);

注意事项

  • 该建表语法的数据输出目录为OBS时,OBS必须为并行文件系统,不能为OBS桶。
  • 使用fileSystem时必须开启checkpoint,保证作业的一致性。
  • format.type为parquet时,支持的数据类型为string, boolean, tinyint, smallint, int, bigint, float, double, map<string, string>, timestamp(3), time。
  • 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启,需要配置为“从checkpoint恢复”。
  • checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,比如10分钟。
  • 使用HDFS时需要绑定相应的跨源,并填写相应的主机信息。
  • 使用hdfs时,请配置主NameNode的所在节点信息。

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

固定为filesystem。

connector.file-path

数据输出目录,格式为: schema://file.path

说明:
当前schame只支持obs和hdfs。
  • 当schema为obs时,表示输出到对象存储服务OBS。注意,OBS必须是并行文件系统,不能是OBS桶。

    示例:obs://bucketName/fileName,表示数据输出到obs的bucketName桶下的fileName目录中。

  • 当schema为hdfs时,表示输出到HDFS。

    示例:hdfs://node-master1sYAx:9820/user/car_infos,其中node-master1sYAx:9820为MRS集群NameNode所在节点信息。

format.type

输出数据编码格式,当前支持“parquet”格式和“csv”格式。

  • 当schema为obs时,输出数据编码格式仅支持“parquet”格式。
  • 当schema为hdfs时,输出数据编码格式支持“parquet”格式和“csv”格式。

format.field-delimiter

属性分隔符。

当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,默认为“,”。

connector.ak

用于访问obs的accessKey

当写入obs时必须填写该字段。

connector.sk

用于访问obs的secretKey

当写入obs时必须填写该字段。

connector.partitioned-by

分区字段,多个字段以“,”分隔

示例

从kafka中读取数据以parquet的格式写到obs的bucketName桶下的fileName目录中。

create table kafkaSource(
  attr0 string,
  attr1 boolean,
  attr2 TINYINT,
  attr3 smallint,
  attr4 int,
  attr5 bigint,
  attr6 float,
  attr7 double,
  attr8 timestamp(3),
  attr9 time
) with (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'test_json',
  'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'connector.properties.group.id' = 'test_filesystem',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'csv'
);

create table filesystemSink(
  attr0 string,
  attr1 boolean,
  attr2 TINYINT,
  attr3 smallint,
  attr4 int,
  attr5 bigint,
  attr6 float,
  attr7 double,
  attr8 map < string,  string >,
  attr9 timestamp(3),
  attr10 time
) with (
  "connector.type" = "filesystem",
  "connector.file-path" = "obs://bucketName/fileName",
  "format.type" = "parquet",
  "connector.ak" = "xxxx",
  "connector.sk" = "xxxxxx"
);

insert into
  filesystemSink
select
  attr0,
  attr1,
  attr2,
  attr3,
  attr4,
  attr5,
  attr6,
  attr7,
  map [attr0,attr0],
  attr8,
  attr9
from
  kafkaSource;