Kafka结果表
功能描述
DLI将Flink作业的输出数据输出到Kafka中。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
前提条件
Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。
注意事项
对接的Kafka集群不支持开启SASL_SSL。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 | create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' ); |
参数说明
参数 | 是否必选 | 说明 |
|---|---|---|
connector.type | 是 | connector类型,对于kafka,需配置为'kafka'。 |
connector.version | 否 | Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 |
format.type | 是 | 数据序列化格式,支持:'csv'、 'json'及'avro'等。 |
format.field-delimiter | 否 | 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 |
connector.topic | 是 | kafka topic名。 |
connector.properties.bootstrap.servers | 是 | kafka brokers地址,以逗号分隔。 |
connector.sink-partitioner | 否 | 记录分区的方式,支持:'fixed', 'round-robin'及'custom'。 |
connector.sink-partitioner-class | 否 | 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' 。 |
update-mode | 否 | 支持:'append'、'retract'及'upsert'三种写入模式。 |
connector.properties.* | 否 | 配置kafka任意原生属性 |
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 | create table kafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.sink-partitioner' = 'round-robin', 'format.type' = 'csv' ); |

