更新时间:2022-07-12 GMT+08:00

userDefined结果表

功能描述

您可通过编写代码实现将DLI处理之后的数据写入到指定的云生态或者开源生态。

前提条件

已编写代码实现自定义sink类:

自定义sink类需要继承Flink开源类:RichSinkFunction,并指定数据类型为:Tuple2<Boolean, Row>。

例如开发自定义类MySink:public class MySink extends RichSinkFunction< Tuple2<Boolean, Row>>{},需重点实现其中的open、invoke和close函数。代码参考示例如下:
public class MySink extends RichSinkFunction<Tuple2<Boolean, Row>> {
    // 初始化
    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    //业务数据处理逻辑具体实现
    /*in包括两个值,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,当为false时,可不进行任何操作。第二个值表示实际的数据值*/
    public void invoke(Tuple2<Boolean, Row> in, Context context) throws Exception {}

    @Override
    public void close() throws Exception {}
}

依赖的pom配置文件内容参考如下:

<dependency>
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-java_2.11</artifactId> 
    <version>${flink.version}</version> 
    <scope>provided</scope> 
</dependency> 

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-core</artifactId> 
    <version>${flink.version}</version> 
    <scope>provided</scope> 
</dependency>

实现完成后将该类编译打包在Jar中,通过Flink OpenSource SQL作业编辑页的UDF Jar参数上传。

语法格式

create table userDefinedSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector.type' = 'user-defined',
  'connector.class-name' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

只能为user-defined,表示使用自定义的sink。

connector.class-name

sink函数的全限定类名。sink类的具体实现可以参考前提条件说明。

connector.class-parameter

sink函数其构造函数的参数,只支持一个String类型的参数。

注意事项

connector.class-name需要为全限定类名。

示例

create table userDefinedSink (
  attr1 int,
  attr2 int
)
with (
  'connector.type' = 'user-defined',
  'connector.class-name' = 'xx.xx.MySink'
);