更新时间:2022-12-07 GMT+08:00

文件系统输出流(推荐)

功能描述

创建sink流将数据输出到分布式文件系统(HDFS)或者对象存储服务(OBS)等文件系统。数据生成后,可直接对生成的目录创建表,通过DLI SQL进行下一步处理分析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。

对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。

语法格式

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  [PARTITIONED BY (attr_name (',' attr_name)*]
  WITH (
    type = "filesystem",
    file.path = "obs://bucket/xx",
    encode = "parquet",
    ak = "",
    sk = ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

输出流类型。“type”“filesystem”,表示输出数据到文件系统。

file.path

输出目录,格式为: schema://file.path。

当前schame只支持obs和hdfs。

  • 当schema为obs时,表示输出到对象存储服务OBS。
  • 当schema为hdfs时,表示输出到HDFS。HDFS需要配置代理用户,具体请参考HDFS代理用户配置

    示例:hdfs://node-master1sYAx:9820/user/car_infos,其中node-master1sYAx:9820为MRS集群NameNode所在节点信息。

encode

输出数据编码格式,当前支持“parquet”格式和“csv”格式。

  • 当schema为obs时,输出数据编码格式仅支持“parquet”格式。
  • 当schema为hdfs时,输出数据编码格式支持“parquet”格式和“csv”格式。

ak

输出到OBS时该参数必填。用于访问OBS认证的accessKey,可使用全局变量,屏蔽敏感信息。

sk

输出到OBS时该参数必填。用于访问OBS认证的secretKey,可使用全局变量,屏蔽敏感信息。

krb_auth

创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的MRS集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。

field_delimiter

属性分隔符。

当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,如:“,”

注意事项

  • 使用文件系统输出流的Flink作业必须开启checkpoint,保证作业的一致性。
  • 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启或者手动重启,需要配置为“从checkpoint恢复”
  • checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,比如10分钟。
  • checkpoint支持如下两种模式:
    • AtLeastOnce:事件至少被处理一次。
    • ExactlyOnce:事件仅被处理一次。
  • 使用文件系统输出流写入数据到OBS时,应避免多个作业写同一个目录的情况。
    • OBS对象存储桶的默认行为为覆盖写,可能导致数据丢失。
    • OBS并行文件系统桶的默认行为追加写,可能导致数据混淆。

    因为以上OBS桶类型行为的区别,为避免作业异常重启可能导致的数据异常问题,请根据您的业务需求选择OBS桶类型。

HDFS代理用户配置

  1. 登录MRS管理页面。
  2. 选择MRS的HDFS Namenode配置,在“自定义”中添加配置参数。

    其中,core-site值名称“hadoop.proxyuser.myname.hosts”“hadoop.proxyuser.myname.groups”中的“myname”为传入的krb认证用户名称。

    需要保证写入HDFS数据路径权限为777。

  3. 配置完成后,单击“保存配置”进行保存。

示例

  • 示例一:

    该示例将car_info数据,以buyday字段为分区字段,parquet为编码格式,转储数据到OBS。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    create sink stream car_infos (
      carId string,
      carOwner string,
      average_speed double,
      buyday string
      ) partitioned by (buyday)
      with (
        type = "filesystem",
        file.path = "obs://obs-sink/car_infos",
        encode = "parquet",
        ak = "{{myAk}}",
        sk = "{{mySk}}"
    );
    

    数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/buyday=xx/part-x-x。

    数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理:

    1. 创建OBS分区表。
      1
      2
      3
      4
      5
      6
      7
      8
      create table car_infos (
        carId string,
        carOwner string,
        average_speed double
      )
        partitioned by (buyday string)
        stored as parquet
        location 'obs://obs-sink/car_infos';
      
    2. 从关联OBS路径中恢复分区信息。
      1
      alter table car_infos recover partitions;
      
  • 示例二

    该示例将car_info数据,以buyday字段为分区字段,csv为编码格式,转储数据到HDFS。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    create sink stream car_infos (
      carId string,
      carOwner string,
      average_speed double,
      buyday string
      ) partitioned by (buyday)
      with (
        type = "filesystem",
        file.path = "hdfs://node-master1sYAx:9820/user/car_infos",
        encode = "csv",
        field_delimiter = ","
    );
    

    数据最终在HDFS中的存储目录结构为:/user/car_infos/buyday=xx/part-x-x。