更新时间:2022-12-07 GMT+08:00
自拓展输出流
用户可通过编写代码实现将DLI处理之后的数据写入指定的云生态或者开源生态。
语法格式
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" );
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,"user_defined"表示数据源为用户自定义数据源。 |
type_class_name |
是 |
用户实现获取源数据的sink类名称,注意包含完整包路径。 |
type_class_parameter |
是 |
用户自定义sink类的入参,仅支持一个string类型的参数。 |
注意事项
用户自定义sink类需要继承类RichSinkFunction,并指定数据类型为Row例如定义类MySink:public class MySink extends RichSinkFunction<Row>{},重点实现其中的open、invoke和close函数。
依赖pom:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
示例
实现数据以CSV编码写入DIS通道。
1 2 3 4 5 6 7 8 |
CREATE SINK STREAM user_out_data ( count INT ) WITH ( type = "user_defined", type_class_name = "mySourceSink.MySink", type_class_parameter = "" ); |
自定义sink类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。
父主题: 自拓展生态