更新时间:2024-04-23 GMT+08:00

Kafka

Kafka是一个分布式流处理平台,最初由LinkedIn开发。它是一个高吞吐量、低延迟的平台,可以处理大量的实时数据流。Kafka主要由三个部分组成:生产者、消费者和代理服务器。生产者将数据发布到Kafka集群,消费者从Kafka集群订阅数据并进行处理,代理服务器则是Kafka集群中的核心组件,负责处理消息的存储和转发。Kafka的主要优点是可扩展性、高吞吐量、低延迟、可靠性和持久性。它被广泛应用于大数据处理、实时数据流处理、日志收集等领域。

创建Kafka连接

  1. 登录新版ROMA Connect控制台。
  2. 在左侧导航栏选择“连接器”,在连接器页面单击“新建连接”。
  3. 选择“Kafka”连接器。
  4. 在弹窗中配置连接器信息,完成后单击“确定”。

    参数

    说明

    连接名称

    填写连接器实例名称。

    Brokers地址

    填写Kafka的Brokers地址。

    认证方式

    选择Kafka的认证方式。

    • SSL认证
    • 无认证

    SASL鉴权方式

    仅当“认证方式”选择“SSL认证”时需要配置

    选择SASL鉴权方式。

    • PLAIN
    • SCRAM-SHA-512

    SSL用户名

    仅当“认证方式”选择“SSL认证”时需要配置

    填写SSL用户名/应用Key。

    SSL密码

    仅当“认证方式”选择“SSL认证”时需要配置

    填写SSL密码。

    SSL证书格式

    仅当“认证方式”选择“SSL认证”时需要配置

    选择SSL证书格式。

    • PEM
    • JKS

    SSL证书

    仅当“认证方式”选择“SSL认证”时需要配置

    填写经过base64编码的证书内容。

    SSL证书密码

    仅当“认证方式”选择“SSL认证”时需要配置

    填写SSL证书密码。

    描述

    填写连接器的描述信息,用于识别不同的连接器。

支持的动作

消费消息

配置参数

参数

说明

主题

需要监听的topic。

GroupId

用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group。

KeyDeserializer

键的反序列化方式,默认为:org.apache.kafka.common.serialization.StringDeserializer。

ValueDeserializer

值的反序列化方式,默认为:org.apache.kafka.common.serialization.StringDeserializer。

AutoOffsetReset

设置当没有初始的偏移量或者偏移量超出范围时,从哪里开始消费。

  • latest:从最近的偏移量开始消费;
  • earliest:从最早的偏移量开始消费;
  • none:抛出异常。