User-defined Result Table
Function
Write your Java code to insert the processed data into a specified database supported by your cloud service.
Prerequisites
Implement the custom sink class :
The custom sink class is inherited from Flink open-source class RichSinkFunction. The data type is Tuple2<Boolean, Row>.
public class MySink extends RichSinkFunction<Tuple2<Boolean, Row>> { // Initialize the object. @Override public void open(Configuration parameters) throws Exception {} @Override // Implement the data processing logic. /* The in parameter contains two values. The first value is of the Boolean type. The value true indicates the insert or update operation, and the value false indicates the delete operation. If the interconnected sink does not support the delete operation, the deletion will not be executed. The second value indicates the data to be operated.*/ public void invoke(Tuple2<Boolean, Row> in, Context context) throws Exception {} @Override public void close() throws Exception {} }
Content of the dependent pom configuration file is as follows:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
Pack the implemented class and compile it in a JAR file, and upload it using the UDF Jar parameter on the editing page of your Flink OpenSource SQL job.
Syntax
create table userDefinedSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'user-defined', 'connector.class-name' = '' );
Parameters
Parameter |
Mandatory |
Description |
---|---|---|
connector.type |
Yes |
Connector type. The value can only be a user-defined sink. |
connector.class-name |
Yes |
Fully qualified class name of the sink class. For details about the implementation of the sink class, see Prerequisites. |
connector.class-parameter |
No |
Parameter of the constructor of the sink class. Only one parameter of the string type is supported. |
Precautions
connector.class-name must be a fully qualified class name.
Example
create table userDefinedSink ( attr1 int, attr2 int ) with ( 'connector.type' = 'user-defined', 'connector.class-name' = 'xx.xx.MySink' );
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.