Custom Sink Stream
Compile code to write the data processed by DLI to a specified cloud ecosystem or open-source ecosystem.
Syntax
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" );
Keywords
Parameter |
Mandatory |
Description |
---|---|---|
type |
Yes |
Data source type. The value user_defined indicates that the data source is a user-defined data source. |
type_class_name |
Yes |
Name of the sink class for obtaining source data. The value must contain the complete package path. |
type_class_parameter |
Yes |
Input parameter of the user-defined sink class. Only one parameter of the string type is supported. |
Precautions
The user-defined sink class needs to inherit the RichSinkFunction class and specify the data type as Row. For example, define MySink class: public class MySink extends RichSinkFunction<Row>{}. It aims to implement the open, invoke, and close functions.
Dependency pom:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
Example
Writing data encoded in CSV format to a DIS stream is used as an example.
1 2 3 4 5 6 7 8 |
CREATE SINK STREAM user_out_data ( count INT ) WITH ( type = "user_defined", type_class_name = "mySourceSink.MySink", type_class_parameter = "" ); |
To customize the implementation of the sink class, you need to pack the class in a JAR package and upload the UDF function on the SQL editing page.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.