更新时间:2024-07-01 GMT+08:00
分享

Kafka源表

功能描述

创建source流从Kafka获取数据,作为作业的输入数据。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

前提条件

Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。

注意事项

对接的Kafka集群不支持开启SASL_SSL。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
create table kafkaSource(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression)
)
with (
  'connector.type' = 'kafka',
  'connector.version' = '',
  'connector.topic' = '',
  'connector.properties.bootstrap.servers' = '',
  'connector.properties.group.id' = '',
  'connector.startup-mode' = '',
  'format.type' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

connector类型,对于kafka,需配置为'kafka'。

connector.version

Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。

format.type

数据反序列化格式,支持:'csv', 'json'及'avro'等。

format.field-delimiter

属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。

connector.topic

kafka topic名。该参数和“connector.topic-pattern”两个参数只能使用其中一个。

connector.topic-pattern

匹配读取kafka topic名称的正则表达式。该参数和“connector.topic”两个参数只能使用其中一个。

例如:

'topic.*'

'(topic-c|topic-d)'

'(topic-a|topic-b|topic-\\d*)'

'(topic-a|topic-b|topic-[0-9]*)'

connector.properties.bootstrap.servers

kafka brokers地址,以逗号分隔。

connector.properties.group.id

消费组名称

connector.startup-mode

consumer启动模式,支持:'earliest-offset', 'latest-offset', 'group-offsets', 'specific-offsets'及'timestamp'。默认值为'group-offsets'。

connector.specific-offsets

指定消费offset,'startup-mode'为'specific-offsets'时需配置,格式为: 'partition:0,offset:42;partition:1,offset:300'。

connector.startup-timestamp-millis

指定起始消费时间戳,'startup-mode'为'timestamp'时需配置。

connector.properties.*

配置kafka任意原生属性。

示例

  • 从Kafka中读取编码格式为csv,对象为kafkaSource的表。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    create table kafkaSource(
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_speed INT)
    with (
      'connector.type' = 'kafka',
      'connector.version' = '0.11',
      'connector.topic' = 'test-topic',
      'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
      'connector.properties.group.id' = 'test-group',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'csv'
    );
    
  • 从Kafka中读取编码格式为不含嵌套的json数据,对象为kafkaSource的表。
    例如不含嵌套的json数据格式为:
    {"car_id": 312, "car_owner": "wang", "car_brand": "tang"}
    {"car_id": 313, "car_owner": "li", "car_brand": "lin"}
    {"car_id": 314, "car_owner": "zhao", "car_brand": "han"}
    则创建表语句为:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    create table kafkaSource(
      car_id STRING,
      car_owner STRING,
      car_brand STRING
    )
    with (
      'connector.type' = 'kafka',
      'connector.version' = '0.11',
      'connector.topic' = 'test-topic',
      'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
      'connector.properties.group.id' = 'test-group',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'json'
    );
    
  • 从Kafka中读取编码格式包含嵌套的json数据,对象为kafkaSource的表。

    例如包含嵌套的json数据格式为:

    {
        "id":"1",
        "type":"online",
        "data":{
            "patient_id":1234,
            "name":"bob1234",
            "age":"Bob",
            "gmt_create":"Bob",
            "gmt_modify":"Bob"
        }
    }
    则创建表语句为:
    CREATE table kafkaSource(
      id STRING,
      type STRING,
      data ROW(
        patient_id STRING, 
        name STRING, 
        age STRING, 
        gmt_create STRING, 
        gmt_modify STRING)
    ) 
    with (
      'connector.type' = 'kafka',
      'connector.version' = '0.11',
      'connector.topic' = 'test-topic',
      'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
      'connector.properties.group.id' = 'test-group',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'json'
    );

相关文档