更新时间:2022-09-29 GMT+08:00
userDefined结果表
功能描述
您可通过编写代码实现将DLI处理之后的数据写入到指定的云生态或者开源生态。
前提条件
已编写代码实现自定义sink类:
自定义sink类需要继承Flink开源类:RichSinkFunction,并指定数据类型为:Tuple2<Boolean, Row>。
例如开发自定义类MySink:public class MySink extends RichSinkFunction< Tuple2<Boolean, Row>>{},需重点实现其中的open、invoke和close函数。代码参考示例如下:
public class MySink extends RichSinkFunction<Tuple2<Boolean, Row>> { // 初始化 @Override public void open(Configuration parameters) throws Exception {} @Override //业务数据处理逻辑具体实现 /*in包括两个值,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,当为false时,可不进行任何操作。第二个值表示实际的数据值*/ public void invoke(Tuple2<Boolean, Row> in, Context context) throws Exception {} @Override public void close() throws Exception {} }
依赖的pom配置文件内容参考如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
实现完成后将该类编译打包在Jar中,通过Flink OpenSource SQL作业编辑页的UDF Jar参数上传。
语法格式
create table userDefinedSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'user-defined', 'connector.class-name' = '' );
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
只能为user-defined,表示使用自定义的sink。 |
connector.class-name |
是 |
sink函数的全限定类名。sink类的具体实现可以参考前提条件说明。 |
connector.class-parameter |
否 |
sink函数其构造函数的参数,只支持一个String类型的参数。 |
注意事项
connector.class-name需要为全限定类名。
示例
create table userDefinedSink ( attr1 int, attr2 int ) with ( 'connector.type' = 'user-defined', 'connector.class-name' = 'xx.xx.MySink' );
父主题: 创建结果表