更新时间:2024-10-10 GMT+08:00

创建Kafka Topic

Topic,即消息主题,用于存储消息,供生产者生产消息以及消费者订阅消息。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。

“Kafka自动创建Topic”表示向一个未创建的Topic生产或消费消息时,系统会自动创建此Topic,此Topic的默认参数值如表1所示。

如果在“配置参数”中修改“log.retention.hours”(老化时间)、“default.replication.factor”(副本数)或“num.partitions”(分区数)的参数值,此后自动创建的Topic参数值为修改后的参数值。例如:“num.partitions”修改为“5”,自动创建的Topic参数值如表1所示。

表1 Topic参数值

参数名称

默认参数值

修改后的参数值

分区数

3

5

副本数

3

3

老化时间

72小时

72小时

同步复制

不开启

不开启

同步落盘

不开启

不开启

消息时间戳类型

CreateTime

CreateTime

批处理消息最大值

10485760字节

10485760字节

本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式:

2023年5月17日及以后创建的实例不提供Kafka Manager功能,不支持在Kafka Manager中创建Topic。

约束与限制

  • Kafka实例对Topic的总分区数设置了上限,当Topic的总分区数达到上限后,用户就无法继续创建Topic。不同规格配置的Topic总分区数不同,具体请参考Kafka产品规格说明
  • 实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。

在控制台创建Topic

  1. 登录管理控制台。
  2. 在管理控制台左上角单击,选择区域。

    请选择Kafka实例所在的区域。

  3. 在管理控制台左上角单击,选择“应用中间件 > 分布式消息服务Kafka版”,进入分布式消息服务Kafka专享版页面。
  4. 单击Kafka实例的名称,进入实例详情页面。
  5. 在左侧导航栏选择“Topic管理”,单击“创建Topic”,弹出“创建Topic”对话框。
  6. 填写Topic名称和配置信息,单击“确定”,完成创建Topic。

    图1 创建Topic
    表2 Topic参数说明

    参数

    说明

    Topic名称

    名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。

    名称不能与以下内置Topic名称一致:

    • __consumer_offsets
    • __transaction_state
    • __trace
    • __connect-status
    • __connect-configs
    • __connect-offsets

    创建Topic后不能修改名称。

    说明:

    由于Kafka内核限制,无法区分句点和下划线,因此无法创建名称中只有句点和下划线不同的Topic。例如,已创建Topic_1,在新创建Topic.1时,报错如下:Topic 'topic.1' collides with existing topics: topic_1。

    分区数

    Topic的分区数。

    如果分区数与消费者数一致,分区数越大消费的并发度越大。

    该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。

    取值范围:1~200

    默认值:3

    副本数

    您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。

    该参数设置为1时,表示只有一份数据。

    取值范围:1~实例的代理数量

    说明:

    实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。

    老化时间(小时)

    消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。

    取值范围:1~720

    默认值:72

    同步复制

    表示后端收到生产消息请求并复制给所有副本后,才返回客户端。

    开启同步复制后,需要在生产者客户端的配置文件或者生产代码中配置acks=all或者-1,否则无效。

    当副本数为1时,不能选择同步复制功能。

    同步落盘

    表示生产的每条消息都会立即写入磁盘,可靠性更高。关闭同步落盘后,生产的消息存在内存中,不会立即写入磁盘。

    消息时间戳类型

    定义消息中的时间戳类型,取值如下:

    • CreateTime:生产者创建消息的时间。
    • LogAppendTime:broker将消息写入日志的时间。

    批处理消息最大值

    Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。

    如果增加“批处理消息最大值”,且存在消费者版本早于0.10.2,此时消费者的“fetch size”值也必须增加,以便消费者可以获取增加后的批处理大小。

    取值范围:0~10485760

    描述

    Topic的描述信息,长度为0~200个字符。

在Kafka Manager创建Topic

登录Kafka Manager后,在页面顶部选择“Topic > Create”,然后按照界面参数填写即可。出于性能考虑,建议单个Topic的分区数设置为200以内。

图2 在Kafka Manager中创建Topic

Topic名称开头包含特殊字符,例如“#”号时,监控数据无法展示。

在客户端创建Topic

Kafka客户端版本为2.2以上时,支持通过kafka-topics.sh创建Topic,以及管理Topic的各类参数。

  • Topic名称开头包含特殊字符,例如“#”号时,监控数据无法展示。
  • 已开启密文接入的实例,“allow.everyone.if.no.acl.found”设置为“false”时,无法通过客户端创建Topic。
  • 未开启密文接入的Kafka实例,在Kafka客户端的“/bin”目录下,通过以下命令创建Topic。
    ./kafka-topics.sh --create --topic ${topic-name} --bootstrap-server ${connection-address} --partitions ${number-of-partitions} --replication-factor ${number-of-replicas}

    参数说明如下:

    • topic-name:Topic名称,支持自定义。
    • connection-address:在Kafka控制台的“基本信息 > 连接信息”中,获取Kafka实例的连接地址。
    • number-of-partitions:Topic的分区数。出于性能考虑,建议单个Topic的分区数设置为200以内。
    • number-of-replicas:Topic的副本数。

    示例如下:

    [root@ecs-kafka bin]# ./kafka-topics.sh --create --topic topic-01 --bootstrap-server 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092 --partitions 3 --replication-factor 3
    Created topic topic-01.
    [root@ecs-kafka bin]#
  • 已开启密文接入的Kafka实例,通过以下步骤创建Topic。
    1. (可选)修改客户端配置文件。
      在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,具体如下。
      • SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
        security.protocol=SASL_PLAINTEXT
        #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=SCRAM-SHA-512
        #SASL认证机制为“PLAIN”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=PLAIN

        参数说明如下:username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。

      • SASL_SSL:如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
        security.protocol=SASL_SSL
        ssl.truststore.location={ssl_truststore_path}
        ssl.truststore.password=dms@kafka
        ssl.endpoint.identification.algorithm=
        #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=SCRAM-SHA-512
        #SASL认证机制为“PLAIN”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=PLAIN

        参数说明如下:

        • ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
        • ssl.truststore.password为服务器证书密码,不可更改需要保持为dms@kafka
        • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空
        • username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。
    2. 在Kafka客户端的“/bin”目录下,通过以下命令创建Topic。
      ./kafka-topics.sh --create --topic ${topic-name} --bootstrap-server ${connection-address} --partitions ${number-of-partitions} --replication-factor ${number-of-replicas} --command-config ../config/ssl-user-config.properties 

      参数说明如下:

      • topic-name:Topic名称,支持自定义。
      • connection-address:在Kafka控制台的“基本信息 > 连接信息”中,获取Kafka实例的连接地址。
      • number-of-partitions:Topic的分区数。出于性能考虑,建议单个Topic的分区数设置为200以内。
      • number-of-replicas:Topic的副本数。

      示例如下:

      [root@ecs-kafka bin]# ./kafka-topics.sh --create --topic topic-01 --bootstrap-server 192.168.xx.xx:9093,192.168.xx.xx:9093,192.168.xx.xx:9093 --partitions 3 --replication-factor 3 --command-config ../config/ssl-user-config.properties
      Created topic topic-01.
      [root@ecs-kafka bin]#