开源Kafka输入流
功能描述
创建source流从Kafka获取数据,作为作业的输入数据。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
前提条件
- Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》中修改主机信息章节。
- Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
1 2 3 4 5 6 7 8 9 |
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "kafka", kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json", json_config="" ); |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,“Kafka”表示数据源。 |
kafka_bootstrap_servers |
是 |
Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 |
kafka_group_id |
否 |
group id。 |
kafka_topic |
是 |
读取的Kafka的topic。目前只支持读取单个topic。 |
encode |
是 |
数据编码格式,可选为“csv”、“json”、“blob”和“user_defined”。
|
encode_class_name |
否 |
当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 |
encode_class_parameter |
否 |
当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 |
json_config |
否 |
当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系。 格式:"field1=json_field1;field2=json_field2" 格式说明:field1、field2为创建的表字段名称。json_field1、json_field2为kafka输入数据json串的key字段名称。 具体使用方法可以参考示例说明。
说明:
如果定义的source stream中的属性和json中的属性名称相同,json_configs可以不用配置。 |
field_delimiter |
否 |
当encode为csv时,用于指定csv字段分隔符,默认为逗号。 |
quote |
否 |
可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。
说明:
|
start_time |
否 |
kafka数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。 该参数配置后,只会读取Kafka topic在该时间点后产生的数据。 |
kafka_properties |
否 |
可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2"。具体的属性值可以参考Apache Kafka中的描述。 |
kafka_certificate_name |
否 |
跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。
说明:
|
注意事项
在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型。
示例
- 从Kafka读取对象为test的topic。数据编码格式为json且不含嵌套,例如:{"attr1": "lilei", "attr2": 18}。
1 2 3 4 5 6 7 8 9
CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json", json_config = "name=attr1;age=attr2" );
- 从Kafka读取对象为test的topic。数据编码格式为json且包含嵌套。本示例使用了复杂数据类型ROW,ROW使用语法可以参考数据类型。
测试数据参考如下:
{ "id":"1", "type2":"online", "data":{ "patient_id":1234, "name":"bob1234" } }
则对应建表语句示例为:CREATE SOURCE STREAM kafka_source ( id STRING, type2 STRING, data ROW< patient_id STRING, name STRING> ) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json" ); CREATE SINK STREAM kafka_sink ( id STRING, type2 STRING, patient_id STRING, name STRING ) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "csv" ); INSERT INTO kafka_sink select id, type2, data.patient_id, data.name from kafka_source;