更新时间:2024-05-29 GMT+08:00

创建Kafka Topic

Topic,即消息主题。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。

“Kafka自动创建Topic”表示在生产或消费一个未创建的Topic时,会自动创建此Topic,此Topic的默认参数值如下:

  • 分区数为3。
  • 副本数为3。
  • 老化时间为72小时。
  • 不开启同步复制和同步落盘。
  • 消息时间戳类型为CreateTime。
  • 批处理消息最大值为10485760字节。

如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。

例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:

  • 分区数为5。
  • 副本数为3。
  • 老化时间为72小时。
  • 不开启同步复制和同步落盘。
  • 消息时间戳类型为CreateTime。
  • 批处理消息最大值为10485760字节。

本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式:

约束与限制

  • Kafka实例对Topic的总分区数设置了上限,当Topic的总分区数达到上限后,用户就无法继续创建Topic。不同规格配置的Topic总分区数不同,具体请参考Kafka产品规格说明
  • 实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。

方式1:在控制台创建Topic

  1. 登录管理控制台。
  2. 在管理控制台左上角单击,选择区域。

    请选择Kafka实例所在的区域。

  3. 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。
  4. 单击Kafka实例的名称,进入实例详情页面。
  5. 在左侧导航栏选择“Topic管理”,单击“创建Topic”,弹出“创建Topic”对话框。

    图1 创建Topic

  6. 填写Topic名称和配置信息。

    表1 Topic参数说明

    参数

    说明

    Topic名称

    系统为您自动生成了Topic名称,您可以根据需要修改。

    创建Topic后不能修改名称。

    分区数

    您可以设置Topic的分区数,分区数越大消费的并发度越大。

    该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。

    取值范围:1~200

    默认值:3

    副本数

    您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。

    该参数设置为1时,表示只有一份数据。

    取值范围:1~实例的代理数量

    说明:

    实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。

    老化时间(小时)

    消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。

    取值范围:1~720

    默认值:72

    同步复制

    指后端收到生产消息请求并复制给所有副本后,才返回客户端。

    开启同步复制后,需要在客户端配置acks=all或者-1,否则无效。

    当副本数为1时,不能选择同步复制功能。

    同步落盘

    • 开启:生产的每条消息都会立即写入磁盘,可靠性更高。
    • 关闭:生产的消息存在内存中,不会立即写入磁盘。

    消息时间戳类型

    定义消息中的时间戳类型,取值如下:

    • CreateTime:生产者创建消息的时间。
    • LogAppendTime:broker将消息写入日志的时间。

    批处理消息最大值

    Kafka允许的最大批处理大小,如果启用消息压缩,则表示压缩后的最大批处理大小。

    如果增加“批处理消息最大值”,且存在消费者版本早于0.10.2,此时消费者的“fetch size”值也必须增加,以便消费者可以获取增加后的批处理大小。

    取值范围:0~10485760

  7. 配置完成后,单击“确定”,完成创建Topic。

方式2:在Kafka Manager创建Topic

登录Kafka Manager后,在页面顶部选择“Topic > Create”,然后按照界面参数填写即可。

图2 在Kafka Manager中创建Topic

Topic名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。

方式3:在Kafka客户端上创建Topic

Kafka客户端版本为2.2以上时,支持通过kafka-topics.sh创建Topic,以及管理Topic的各类参数。

  • Topic名称开头包含特殊字符,例如#号“#”时,监控数据无法展示。
  • 已开启SASL的实例,“allow.everyone.if.no.acl.found”设置为“false”时,无法通过客户端创建Topic。
  • 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
    ./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}
  • 已开启SASL的Kafka实例,通过以下步骤创建Topic。
    1. (可选)如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤。否则请执行以下操作。

      在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加用户名和密码,以及SSL证书配置。

    2. 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
      ./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --command-config ./config/ssl-user-config.properties