更新时间:2022-08-12 GMT+08:00

自拓展输出流

用户可通过编写代码实现将DLI处理之后的数据写入指定的云生态或者开源生态。

语法格式

CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,"user_defined"表示数据源为用户自定义数据源。

type_class_name

用户实现获取源数据的sink类名称,注意包含完整包路径。

type_class_parameter

用户自定义sink类的入参,仅支持一个string类型的参数。

注意事项

用户自定义sink类需要继承类RichSinkFunction,并指定数据类型为Row例如定义类MySink:public class MySink extends RichSinkFunction<Row>{},重点实现其中的open、invoke和close函数。

依赖pom:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-core</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>

示例

实现数据以CSV编码写入DIS通道。

1
2
3
4
5
6
7
8
CREATE SINK STREAM user_out_data (
	count INT
)
  WITH (  
	type = "user_defined", 
	type_class_name = "mySourceSink.MySink", 
	type_class_parameter = ""
      );

自定义sink类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。