创建Topic
Topic,即消息主题。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。
“Kafka自动创建Topic”表示在生产或消费一个未创建的Topic时,会自动创建此Topic,此Topic的默认参数值如下:分区数为3,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:分区数为5,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。
Kafka实例对Topic的总分区数设置了上限,当Topic的总分区数达到上限后,用户就无法继续创建Topic。不同规格配置的Topic总分区数不同,具体请参考Kafka产品规格说明。
本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式:
实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
方式1:在控制台创建
- 登录管理控制台。
- 在管理控制台左上角单击,选择区域。
请选择Kafka实例所在的区域。
- 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。
- 单击Kafka实例的名称,进入实例详情页面。
- 选择“Topic管理”页签,单击“创建Topic”。
弹出“创建Topic”对话框。
图1 创建Topic
- 填写Topic名称和配置信息。
表1 Topic参数说明 参数
说明
Topic名称
系统为您自动生成了Topic名称,您可以根据需要修改。
创建Topic后不能修改名称。
分区数
您可以设置Topic的分区数,分区数越大消费的并发度越大。
该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。
取值范围:1~100
默认值:3
副本数
您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
该参数设置为1时,表示只有一份数据。
默认值:3
说明:实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
老化时间(小时)
消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
取值范围:1-168
默认值:72
同步复制
指后端收到生产消息请求并复制给所有副本后,才返回客户端。
开启同步复制后,需要在客户端配置acks=all或者-1,否则无效。
当副本数为1时,不能选择同步复制功能。
同步落盘
同步落盘是指生产的每条消息都会立即写入磁盘。
- 开启:生产的每条消息都会立即写入磁盘,可靠性更高。
- 关闭:生产的消息存在内存中,不会立即写入磁盘。
- 配置完成后,单击“确定”,完成创建Topic。
方式2:在Kafka Manager创建
登录Kafka Manager后,在页面顶部选择“Topic > Create”,然后按照界面参数填写即可。
Topic名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
方式3:在Kafka客户端上创建
Kafka客户端版本为2.2以上时,支持通过kafka-topics.sh创建Topic,以及管理Topic的各类参数。
Topic名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
- 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}
- 已开启SASL的Kafka实例,通过以下步骤创建Topic。
- (可选)如果已经设置了SSL证书配置,请跳过此步骤。否则请执行以下操作。
在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加SSL证书配置。
- 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --command-config ./config/ssl-user-config.properties
- (可选)如果已经设置了SSL证书配置,请跳过此步骤。否则请执行以下操作。