Kafka输入流
概述
创建source流从Kafka获取数据,作为作业的输入数据。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
前提条件
语法
语法格式
CREATE SOURCE STREAM kafka_source (name STRING, age int)WITH (type = "kafka",kafka_bootstrap_servers = "",kafka_group_id = "",kafka_topic = "",encode = "json",json_config="")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
语法说明
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,“Kafka”表示数据源。 |
kafka_bootstrap_servers |
是 |
Kafka的连接端口,需要确保能连通(需要通过对等连接的方式开通CS集群和Kafka集群的连接)。 |
kafka_group_id |
是 |
group id。 |
kafka_topic |
是 |
读取的Kafka的topic。 |
encode |
是 |
数据编码格式,支持“json”,“csv”、“blob”和“user_defined”。
|
json_config |
否 |
当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系,格式为"field1=json_field1;field2=json_field2"。 |
field_delimiter |
否 |
当encode为csv时,用于指定csv字段分隔符,默认为逗号。 |
encode_class_name |
否 |
当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 |
encode_class_parameter |
否 |
当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 |
quote |
否 |
可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。
说明:
设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 |
timeindicator |
否 |
在流中增加时间戳,可增加“processing time”时间戳或者“event time”时间戳。 说明:
|
start_time |
否 |
kafka数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。 |
kafka_properties |
否 |
可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" |
注意事项
用来做时间戳的属性类型必须为long或者timestamp。
示例
从Kafka读取对象为test的topic。数据实例:{"attr1": "lilei", "attr2": 18}。
CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json", json_config = "name=attr1;age=attr2" );
