开源Kafka输出流
功能描述
DLI将Flink作业的输出数据输出到Kafka中。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
前提条件
语法格式
1 2 3 4 5 6 7 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH(
type = "kafka",
kafka_bootstrap_servers = "",
kafka_topic = "",
encode = "json"
)
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,"kafka"表示输出到Kafka中。 |
kafka_bootstrap_servers |
是 |
Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 |
kafka_topic |
是 |
写入的topic |
encode |
是 |
数据编码格式,可选为“csv”、“json”和“user_defined”。
|
filed_delimiter |
否 |
当encode为csv时,用于指定各字段分隔符,默认为逗号。 |
encode_class_name |
否 |
当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 |
encode_class_parameter |
否 |
当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 |
kafka_properties |
否 |
可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" |
kafka_certificate_name |
否 |
跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。
说明:
|
注意事项
无。
示例
将流kafka_sink的数据输出到Kafka中。
1 2 3 4 5 6 7 |
CREATE SINK STREAM kafka_sink (name STRING)
WITH (
type="kafka",
kafka_bootstrap_servers = "ip1:port1,ip2:port2",
kafka_topic = "testsink",
encode = "json"
);
|