FileSytem结果表
功能描述
FileSystem结果表用于将数据输出到分布式文件系统HDFS或者对象存储服务OBS等文件系统。数据生成后,可直接对生成的目录创建非DLI表,通过DLI SQL进行下一步处理分析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。
语法格式
1 2 3 4 5 6 7 |
create table filesystemSink ( attr_name attr_type (',' attr_name attr_type) * ) with ( 'connector.type' = 'filesystem', 'connector.file-path' = '', 'format.type' = '' ); |
注意事项
- 该建表语法的数据输出目录为OBS时,OBS必须为并行文件系统,不能为OBS桶。
- 使用fileSystem时必须开启checkpoint,保证作业的一致性。
- format.type为parquet时,支持的数据类型为string, boolean, tinyint, smallint, int, bigint, float, double, map<string, string>, timestamp(3), time。
- 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启,需要配置为“从checkpoint恢复”。
- checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,比如10分钟。
- 使用HDFS时需要绑定相应的跨源,并填写相应的主机信息。
- 使用hdfs时,请配置主NameNode的所在节点信息。
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
固定为filesystem。 |
connector.file-path |
是 |
数据输出目录,格式为: schema://file.path。 |
format.type |
是 |
输出数据编码格式,当前支持“parquet”格式和“csv”格式。
|
format.field-delimiter |
否 |
属性分隔符。 当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,默认为“,”。 |
connector.ak |
否 |
用于访问obs的accessKey 当写入obs时必须填写该字段。 |
connector.sk |
否 |
用于访问obs的secretKey 当写入obs时必须填写该字段。 |
connector.partitioned-by |
否 |
分区字段,多个字段以“,”分隔 |
示例
从kafka中读取数据以parquet的格式写到obs的bucketName桶下的fileName目录中。
create table kafkaSource( attr0 string, attr1 boolean, attr2 TINYINT, attr3 smallint, attr4 int, attr5 bigint, attr6 float, attr7 double, attr8 timestamp(3), attr9 time ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test_json', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test_filesystem', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'csv' ); create table filesystemSink( attr0 string, attr1 boolean, attr2 TINYINT, attr3 smallint, attr4 int, attr5 bigint, attr6 float, attr7 double, attr8 map < string, string >, attr9 timestamp(3), attr10 time ) with ( "connector.type" = "filesystem", "connector.file-path" = "obs://bucketName/fileName", "format.type" = "parquet", "connector.ak" = "xxxx", "connector.sk" = "xxxxxx" ); insert into filesystemSink select attr0, attr1, attr2, attr3, attr4, attr5, attr6, attr7, map [attr0,attr0], attr8, attr9 from kafkaSource;