MRS Kafka输出流
功能描述
DLI将Flink作业的输出数据输出到Kafka中。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。MRS基于Apache Kafka在平台部署并托管了Kafka集群。
前提条件
- Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》中修改主机信息章节。
- Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参见《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
1 2 3 4 5 6 7 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" ) |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,"kafka"表示输出到Kafka中。 |
kafka_bootstrap_servers |
是 |
Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 |
kafka_topic |
是 |
写入的topic。 |
encode |
是 |
编码格式,当前支持“json”和“user_defined”。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 |
encode_class_name |
否 |
当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 |
encode_class_parameter |
否 |
当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 |
krb_auth |
否 |
创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的MRS集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 |
kafka_properties |
否 |
可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" |
kafka_certificate_name |
否 |
跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。
说明:
|
注意事项
无。
示例
将数据输出到Kafka中。
- 示例一
1 2 3 4 5 6 7
CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" );
- 示例二
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
CREATE SINK STREAM kafka_sink ( a1 string, a2 string, a3 string, a4 INT ) // 输出字段 WITH ( type="kafka", kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093", kafka_topic = "testflink", // 写入的topic encode = "csv", // 编码格式,支持json/csv kafka_certificate_name = "Flink", kafka_properties_delimiter = ",", kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL" );