文档首页 > > SQL语法参考> 流作业SQL语法> 创建输出流> 开源Kafka输出流

开源Kafka输出流

分享
更新时间: 2020/05/12 GMT+08:00

功能描述

DLI将Flink作业的输出数据输出到Kafka中。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

前提条件

  • Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》增强型跨源连接章节。
  • Kafka是线下集群,需要通过VPC服务的对等连接功能将Flink作业与Kafka进行对接。

    如何建立增强型跨源连接,请参考《数据湖探索用户指南》增强型跨源连接章节。

语法格式

1
2
3
4
5
6
7
CREATE SINK STREAM kafka_sink (name STRING)
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
  )

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,"kafka"表示输出到Kafka中。

kafka_bootstrap_servers

Kafka的连接端口,需要确保能连通(需要通过对等连接的方式开通DLI队列和Kafka集群的连接)。

kafka_topic

写入的topic

encode

数据编码格式,可选为“csv”“json”“user_defined”

说明:
  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“user_defined”,则需配置“encode_class_name”“encode_class_parameter”属性。

filed_delimiter

当encode为csv时,用于指定各字段分隔符,默认为逗号。

encode_class_name

当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。

encode_class_parameter

当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。

kafka_properties

可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2"

kafka_certificate_name

跨源认证信息名称。数据源类型“type”为“Kafka_SSL”时,该参数有效。

说明:
  • 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。
  • Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。

注意事项

无。

示例

将流kafka_sink的数据输出到Kafka中。

1
2
3
4
5
6
7
CREATE SINK STREAM kafka_sink (name STRING) 
  WITH (
    type="kafka",
    kafka_bootstrap_servers =  "ip1:port1,ip2:port2",
    kafka_topic = "testsink",
    encode = "json" 
  );
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问