文档首页 > > 流生态开发指南> 开源生态:Apache Kafka> Kafka输入流

Kafka输入流

分享
更新时间: 2019/08/27 GMT+08:00

概述

创建source流从Kafka获取数据,作为作业的输入数据。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

前提条件

  • Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到CS集群中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《实时流计算服务用户指南》集群管理章节中的“添加IP域名映射”部分。
  • Kafka是线下集群,需要通过VPC服务的对等连接功能将CS服务与Kafka进行对接。

    如何建立对等连接,请参见《实时流计算服务用户指南》对等连接章节。

语法

语法格式

CREATE SOURCE STREAM kafka_source (name STRING, age int)WITH (type = "kafka",kafka_bootstrap_servers = "",kafka_group_id = "",kafka_topic = "",encode = "json",json_config="")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

语法说明

表1 语法说明

参数

是否必选

说明

type

数据源类型,“Kafka”表示数据源。

kafka_bootstrap_servers

Kafka的连接端口,需要确保能连通(需要通过对等连接的方式开通CS集群和Kafka集群的连接)。

kafka_group_id

group id。

kafka_topic

读取的Kafka的topic。

encode

数据编码格式,支持“json”“csv”“blob”“user_defined”

  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“json”,则需配置“json_config”属性。
  • 当编码格式为“blob”时,表示不对接收的数据进行解析,流属性仅能有一个且为Array[TINYINT]类型。
  • 若编码格式为“user_defined”,则需配置“encode_class_name”“encode_class_parameter”属性。

json_config

当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系,格式为"field1=json_field1;field2=json_field2"。

field_delimiter

当encode为csv时,用于指定csv字段分隔符,默认为逗号。

encode_class_name

当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。

encode_class_parameter

当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。

quote

可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。

  • 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
  • 当引用符号为单引号时,则设置quote = "'"。
说明:

设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。

timeindicator

在流中增加时间戳,可增加“processing time”时间戳或者“event time”时间戳。

说明:
  • 若设置“processing time”,则为proctime.proctime。

    当设置了proctime.proctime时,会在原有属性字段基础上多增加一个proctime系统时间戳属性,假设原有字段为3个,设置了proctime.proctime后会变成4个,设置rowtime属性字段不会发生变化。

  • 若设置“event time”,可选择流中的某个属性来作为时间戳,格式为attr_name.rowtime。
  • 以上两者可以同时设置。

start_time

kafka数据读取起始时间。

当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。

kafka_properties

可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2"

注意事项

用来做时间戳的属性类型必须为long或者timestamp。

示例

从Kafka读取对象为test的topic。数据实例:{"attr1": "lilei", "attr2": 18}。

CREATE SOURCE STREAM kafka_source (name STRING, age int)
WITH (
  type = "kafka",
  kafka_bootstrap_servers = "ip1:port1,ip2:port2", 
  kafka_group_id = "sourcegroup1", 
  kafka_topic = "test",
  encode = "json",
  json_config = "name=attr1;age=attr2"
);
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区