DIS输出流
功能描述
DLI将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。
数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
语法格式
1 2 3 4 5 6 7 8 9 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_key = "", encode= "", field_delimiter= "" ); |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,dis表示输出到数据接入服务。 |
region |
是 |
数据所在的DIS所在区域。 |
ak |
否 |
访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 |
sk |
否 |
Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 |
channel |
是 |
DIS通道。 |
partition_key |
否 |
数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。 |
encode |
是 |
数据编码格式,可选为“csv”、“json”和“user_defined”。
说明:
|
field_delimiter |
是 |
属性分隔符。
|
json_config |
否 |
当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2”。 |
enable_output_null |
否 |
当编码格式为json时,需使用该参数来配置是否输出空字段。 当该参数为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。默认值为“true”。 |
encode_class_name |
否 |
当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 |
encode_class_parameter |
否 |
当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 |
注意事项
无。
示例
- CSV编码格式:数据输出到DIS通道,使用csv编码,并且以逗号为分隔符,多个分区用car_owner做为key进行分发。数据输出示例:"ZJA710XC", "lilei", "BMW", 700000。
1 2 3 4 5 6 7 8 9 10 11 12 13
CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "xxx", channel = "dlioutput", encode = "csv", field_delimiter = "," );
- JSON编码格式:数据输出到DIS通道,使用json编码,多个分区用car_owner,car_brand 做为key进行分发,“enableOutputNull”为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。数据示例:"car_id ":"ZJA710XC", "car_owner ":"lilei", "car_brand ":"BMW", "car_price ":700000。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", channel = "dlioutput", region = "xxx", partition_key = "car_owner,car_brand", encode = "json", enable_output_null = "false" );