创建Kafka Topic
Topic,即消息主题。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。
“Kafka自动创建Topic”表示在生产或消费一个未创建的Topic时,会自动创建此Topic,此Topic的默认参数值如下:
- 分区数为3。
- 副本数为3。
- 老化时间为72小时。
- 不开启同步复制和同步落盘。
- 消息时间戳类型为CreateTime。
- 批处理消息最大值为10485760字节。
如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。
例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:
- 分区数为5。
- 副本数为3。
- 老化时间为72小时。
- 不开启同步复制和同步落盘。
- 消息时间戳类型为CreateTime。
- 批处理消息最大值为10485760字节。
本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式:
约束与限制
- Kafka实例对Topic的总分区数设置了上限,当Topic的总分区数达到上限后,用户就无法继续创建Topic。不同规格配置的Topic总分区数不同,具体请参考Kafka产品规格说明。
- 实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
方式1:在控制台创建Topic
- 登录管理控制台。
- 在管理控制台左上角单击,选择区域。
请选择Kafka实例所在的区域。
- 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。
- 单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏选择“Topic管理”,单击“创建Topic”,弹出“创建Topic”对话框。
图1 创建Topic
- 填写Topic名称和配置信息。
表1 Topic参数说明 参数
说明
Topic名称
系统为您自动生成了Topic名称,您可以根据需要修改。
创建Topic后不能修改名称。
分区数
您可以设置Topic的分区数,分区数越大消费的并发度越大。
该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。
取值范围:1~200
默认值:3
副本数
您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
该参数设置为1时,表示只有一份数据。
取值范围:1~实例的代理数量
说明:实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
老化时间(小时)
消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
取值范围:1~720
默认值:72
同步复制
指后端收到生产消息请求并复制给所有副本后,才返回客户端。
开启同步复制后,需要在客户端配置acks=all或者-1,否则无效。
当副本数为1时,不能选择同步复制功能。
同步落盘
- 开启:生产的每条消息都会立即写入磁盘,可靠性更高。
- 关闭:生产的消息存在内存中,不会立即写入磁盘。
消息时间戳类型
定义消息中的时间戳类型,取值如下:
- CreateTime:生产者创建消息的时间。
- LogAppendTime:broker将消息写入日志的时间。
批处理消息最大值
Kafka允许的最大批处理大小,如果启用消息压缩,则表示压缩后的最大批处理大小。
如果增加“批处理消息最大值”,且存在消费者版本早于0.10.2,此时消费者的“fetch size”值也必须增加,以便消费者可以获取增加后的批处理大小。
取值范围:0~10485760
- 配置完成后,单击“确定”,完成创建Topic。
方式2:在Kafka Manager创建Topic
登录Kafka Manager后,在页面顶部选择“Topic > Create”,然后按照界面参数填写即可。
Topic名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
方式3:在Kafka客户端上创建Topic
Kafka客户端版本为2.2以上时,支持通过kafka-topics.sh创建Topic,以及管理Topic的各类参数。
- Topic名称开头包含特殊字符,例如#号“#”时,监控数据无法展示。
- 已开启SASL的实例,“allow.everyone.if.no.acl.found”设置为“false”时,无法通过客户端创建Topic。
- 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}
- 已开启SASL的Kafka实例,通过以下步骤创建Topic。
- (可选)如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤。否则请执行以下操作。
在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加用户名和密码,以及SSL证书配置。
- 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --command-config ./config/ssl-user-config.properties
- (可选)如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤。否则请执行以下操作。