更新时间:2024-09-27 GMT+08:00

Kafka结果表

功能描述

DLI将Flink作业的输出数据输出到Kafka中。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

前提条件

Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。

注意事项

对接的Kafka集群不支持开启SASL_SSL。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
create table kafkaSource(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector.type' = 'kafka',
  'connector.version' = '',
  'connector.topic' = '',
  'connector.properties.bootstrap.servers' = '',
  'format.type' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

connector类型,对于kafka,需配置为'kafka'。

connector.version

Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。

format.type

数据序列化格式,支持:'csv'、 'json'及'avro'等。

format.field-delimiter

属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。

connector.topic

kafka topic名。

connector.properties.bootstrap.servers

kafka brokers地址,以逗号分隔。

connector.sink-partitioner

记录分区的方式,支持:'fixed', 'round-robin'及'custom'。

connector.sink-partitioner-class

'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' 。

update-mode

支持:'append'、'retract'及'upsert'三种写入模式。

connector.properties.*

配置kafka任意原生属性

示例

将kafkaSink的数据输出到Kafka中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table kafkaSink(
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_speed INT)
with (
  'connector.type' = 'kafka',
  'connector.version' = '0.10',
  'connector.topic' = 'test-topic',
  'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'connector.sink-partitioner' = 'round-robin',
  'format.type' = 'csv'
);