更新时间:2022-12-07 GMT+08:00

开源Kafka输入流

功能描述

创建source流从Kafka获取数据,作为作业的输入数据。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

前提条件

  • Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。

语法格式

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json",
    json_config=""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,“Kafka”表示数据源。

kafka_bootstrap_servers

Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。

kafka_group_id

group id。

kafka_topic

读取的Kafka的topic。目前只支持读取单个topic。

encode

数据编码格式,可选为“csv”“json”“blob”“user_defined”

  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“json”,则需配置“json_config”属性。
  • 当编码格式为"blob"时,表示不对接收的数据进行解析,当前表仅能有一个且为Array[TINYINT]类型的表字段。
  • 若编码格式为“user_defined”,则需配置“encode_class_name”“encode_class_parameter”属性。

encode_class_name

当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。

encode_class_parameter

当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。

json_config

当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系。

格式:"field1=json_field1;field2=json_field2"

格式说明:field1、field2为创建的表字段名称。json_field1、json_field2为kafka输入数据json串的key字段名称。

具体使用方法可以参考示例说明。

说明:

如果定义的source stream中的属性和json中的属性名称相同,json_configs可以不用配置。

field_delimiter

当encode为csv时,用于指定csv字段分隔符,默认为逗号。

quote

可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。

  • 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
  • 当引用符号为单引号时,则设置quote = "'"。
说明:
  • 目前仅适用于CSV格式。
  • 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。

start_time

kafka数据读取起始时间。

当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。

该参数配置后,只会读取Kafka topic在该时间点后产生的数据。

kafka_properties

可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2"。具体的属性值可以参考Apache Kafka中的描述。

kafka_certificate_name

跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。

说明:
  • 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。
  • Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。

注意事项

在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型

示例

  • 从Kafka读取对象为test的topic。数据编码格式为json且不含嵌套,例如:{"attr1": "lilei", "attr2": 18}。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE SOURCE STREAM kafka_source (name STRING, age int)
    WITH (
      type = "kafka",
      kafka_bootstrap_servers = "ip1:port1,ip2:port2", 
      kafka_group_id = "sourcegroup1", 
      kafka_topic = "test",
      encode = "json",
      json_config = "name=attr1;age=attr2"
    );
    
  • 从Kafka读取对象为test的topic。数据编码格式为json且包含嵌套。本示例使用了复杂数据类型ROW,ROW使用语法可以参考数据类型

    测试数据参考如下:

    {
        "id":"1",
        "type2":"online",
        "data":{
            "patient_id":1234,
            "name":"bob1234"
        }
    }
    则对应建表语句示例为:
    CREATE SOURCE STREAM kafka_source 
    (
      id STRING,
      type2 STRING,
      data ROW<
        patient_id STRING, 
        name STRING>
    )
    WITH (
      type = "kafka",
      kafka_bootstrap_servers = "ip1:port1,ip2:port2", 
      kafka_group_id = "sourcegroup1", 
      kafka_topic = "test",
      encode = "json"
    );
    
    CREATE SINK STREAM kafka_sink 
    (
      id STRING,
      type2 STRING,
      patient_id STRING, 
      name STRING
    )
      WITH (
        type="kafka",
        kafka_bootstrap_servers =  "ip1:port1,ip2:port2",
        kafka_topic = "testsink",
        encode = "csv" 
      );
    
    INSERT INTO kafka_sink select id, type2, data.patient_id, data.name from kafka_source;