文档首页 > > SQL语法参考> 流作业SQL语法> 自拓展生态> 自拓展输出流

自拓展输出流

分享
更新时间: 2020/05/12 GMT+08:00

用户可通过编写代码实现将DLI处理之后的数据写入指定的云生态或者开源生态。

语法格式

CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,"user_defined"表示数据源为用户自定义数据源。

type_class_name

用户实现获取源数据的sink类名称,注意包含完整包路径。

type_class_parameter

用户自定义sink类的入参,仅支持一个string类型的参数。

注意事项

用户自定义sink类需要继承类RichSinkFunction,并指定数据类型为Row例如定义类MySink:public class MySink extends RichSinkFunction<Row>{},重点实现其中的open、invoke和close函数。

依赖pom:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-core</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>

示例

实现数据以CSV编码写入DIS通道。

1
2
3
4
5
6
7
8
CREATE SINK STREAM user_out_data (
	count INT
)
  WITH (  
	type = "user_defined", 
	type_class_name = "mySourceSink.MySink", 
	type_class_parameter = ""
      );

自定义sink类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。

分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问