更新时间:2024-09-27 GMT+08:00

DIS输出流

功能描述

DLI将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。

数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。

语法格式

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dis",
    region = "",
    channel = "",
    partition_key = "",
    encode= "",
    field_delimiter= ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,dis表示输出到数据接入服务。

region

数据所在的DIS所在区域。

ak

访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证

sk

Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证

channel

DIS通道。

partition_key

数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。

encode

数据编码格式,可选为“csv”“json”“user_defined”

说明:
  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“json”,则需使用“enable_output_null”来配置是否输出空字段,具体见示例。
  • 若编码格式为“user_defined”,则需配置“encode_class_name”“encode_class_parameter”属性。

field_delimiter

属性分隔符。

  • 当编码格式为csv时,需要设置属性分隔符,用户可以自定义,如:
  • 当编码格式为json时,则不需要设置属性之间的分隔符。

json_config

当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2”。

enable_output_null

当编码格式为json时,需使用该参数来配置是否输出空字段。

当该参数为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。默认值为“true”。

encode_class_name

当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。

encode_class_parameter

当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。

注意事项

无。

示例

  • CSV编码格式:数据输出到DIS通道,使用csv编码,并且以逗号为分隔符,多个分区用car_owner做为key进行分发。数据输出示例:"ZJA710XC", "lilei", "BMW", 700000。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    CREATE SINK STREAM audi_cheaper_than_30w (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT
    )
      WITH (
        type = "dis",
        region = "xxx",
        channel = "dlioutput",
        encode = "csv",
        field_delimiter = ","
    ); 
    
  • JSON编码格式:数据输出到DIS通道,使用json编码,多个分区用car_owner,car_brand 做为key进行分发,“enableOutputNull”“true”表示输出空字段(值为null),若为“false”表示不输出空字段。数据示例:"car_id ":"ZJA710XC", "car_owner ":"lilei", "car_brand ":"BMW", "car_price ":700000。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    CREATE SINK STREAM audi_cheaper_than_30w (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT
    )
      WITH (
        type = "dis",
        channel = "dlioutput",
        region = "xxx",
        partition_key = "car_owner,car_brand",
        encode = "json",
        enable_output_null = "false"
    );