Open-Source Kafka Sink Stream
Function
DLI exports the output data of the Flink job to Kafka.
Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.
Prerequisites
- Kafka is an offline cluster. You need to use the enhanced datasource connection function to connect Flink jobs to Kafka. You can also set security group rules as required.
Syntax
| 1 2 3 4 5 6 7 | CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" ) | 
Keywords
| Parameter | Mandatory | Description | 
|---|---|---|
| type | Yes | Output channel type. kafka indicates that data is exported to Kafka. | 
| kafka_bootstrap_servers | Yes | Port that connects DLI to Kafka. Use enhanced datasource connections to connect DLI queues with Kafka clusters. | 
| kafka_topic | Yes | Kafka topic into which DLI writes data. | 
| encode | Yes | Data encoding format. The value can be csv, json, or user_defined. 
 | 
| filed_delimiter | No | If encode is set to csv, you can use this parameter to specify the separator between CSV fields. By default, the comma (,) is used. | 
| encode_class_name | No | If encode is set to user_defined, you need to set this parameter to the name of the user-defined decoding class (including the complete package path). The class must inherit the DeserializationSchema class. | 
| encode_class_parameter | No | If encode is set to user_defined, you can set this parameter to specify the input parameter of the user-defined decoding class. Only one parameter of the string type is supported. | 
| kafka_properties | No | This parameter is used to configure the native attributes of Kafka. The format is key1=value1;key2=value2. | 
| kafka_certificate_name | No | Name of the datasource authentication information. This parameter is valid only when the datasource authentication type is set to Kafka_SSL. 
         NOTE: 
         
 | 
Precautions
None
Example
Output the data in the kafka_sink stream to Kafka.
| 1 2 3 4 5 6 7 | CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" ); | 
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot 
    