创建Kafka Topic
Topic,即消息主题,用于存储消息,供生产者生产消息以及消费者订阅消息。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。
“Kafka自动创建Topic”表示向一个未创建的Topic生产或消费消息时,系统会自动创建此Topic,此Topic的默认参数值如表1所示。
集群实例支持在“配置参数”中修改“log.retention.hours”(老化时间)、“default.replication.factor”(副本数)或“num.partitions”(分区数)的参数值,单机实例不支持修改配置参数。修改参数值后自动创建的Topic,Topic参数值为修改后的值。
例如:“num.partitions”修改为“5”,自动创建的Topic参数值如表1所示。
参数名称 |
单机实例默认值 |
集群实例默认值 |
集群实例修改后的值 |
---|---|---|---|
分区数 |
1 |
3 |
5 |
副本数 |
1 |
3 |
3 |
老化时间 |
72小时 |
72小时 |
72小时 |
同步复制 |
不开启 |
不开启 |
不开启 |
同步落盘 |
不开启 |
不开启 |
不开启 |
消息时间戳类型 |
CreateTime |
CreateTime |
CreateTime |
批处理消息最大值 |
10485760字节 |
10485760字节 |
10485760字节 |
本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式:
2023年5月17日及以后创建的实例不提供Kafka Manager功能,不支持在Kafka Manager中创建Topic。
约束与限制
在控制台创建Topic
- 登录管理控制台。
- 在管理控制台左上角单击,选择区域。
请选择Kafka实例所在的区域。
- 在管理控制台左上角单击,选择“应用中间件 > 分布式消息服务Kafka版”,进入分布式消息服务Kafka专享版页面。
- 单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏选择“Topic管理”,单击“创建Topic”,弹出“创建Topic”对话框。
- 填写Topic名称和配置信息,单击“确定”,完成创建Topic。
图1 创建Topic(集群实例)
表2 Topic参数说明 参数
说明
Topic名称
名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。
名称不能与以下内置Topic名称一致:
- __consumer_offsets
- __transaction_state
- __trace
- __connect-status
- __connect-configs
- __connect-offsets
创建Topic后不能修改名称。
说明:由于Kafka内核限制,无法区分句点和下划线,因此无法创建名称中只有句点和下划线不同的Topic。例如,已创建Topic_1,在新创建Topic.1时,报错如下:Topic 'topic.1' collides with existing topics: topic_1。
分区数
Topic的分区数。
如果分区数与消费者数一致,分区数越大消费的并发度越大。
该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。
取值范围:1~200
副本数
您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
该参数设置为1时,表示只有一份数据。
取值范围:1~实例的代理数量
说明:实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
老化时间(小时)
消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
取值范围:1~720
同步复制
表示后端收到生产消息请求并复制给所有副本后,才返回客户端。
开启同步复制后,需要在生产者客户端的配置文件或者生产代码中配置acks=all或者-1,否则无效。
当副本数为1时,不能选择同步复制功能。
同步落盘
表示生产的每条消息都会立即写入磁盘,可靠性更高。关闭同步落盘后,生产的消息存在内存中,不会立即写入磁盘。
消息时间戳类型
定义消息中的时间戳类型,取值如下:
- CreateTime:生产者创建消息的时间。
- LogAppendTime:broker将消息写入日志的时间。
批处理消息最大值
Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。
如果增加“批处理消息最大值”,且存在消费者版本早于0.10.2,此时消费者的“fetch size”值也必须增加,以便消费者可以获取增加后的批处理大小。
取值范围:0~10485760
描述
Topic的描述信息,长度为0~200个字符。
在Kafka Manager创建Topic
登录Kafka Manager后,在页面顶部选择“Topic > Create”,然后按照界面参数填写即可。出于性能考虑,建议单个Topic的分区数设置为200以内。
Topic名称开头包含特殊字符,例如“#”号时,监控数据无法展示。
在客户端创建Topic
Kafka客户端版本为2.2以上时,支持通过kafka-topics.sh创建Topic,以及管理Topic的各类参数。
- Topic名称开头包含特殊字符,例如“#”号时,监控数据无法展示。
- 已开启密文接入的实例,“allow.everyone.if.no.acl.found”设置为“false”时,无法通过客户端创建Topic。
- 未开启密文接入的Kafka实例,在Kafka客户端的“/bin”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic ${topic-name} --bootstrap-server ${connection-address} --partitions ${number-of-partitions} --replication-factor ${number-of-replicas}
参数说明如下:
- topic-name:Topic名称,支持自定义。
- connection-address:在Kafka控制台的“基本信息 > 连接信息”中,获取Kafka实例的连接地址。
- number-of-partitions:Topic的分区数。出于性能考虑,建议单个Topic的分区数设置为200以内。
- number-of-replicas:Topic的副本数。
示例如下:
[root@ecs-kafka bin]# ./kafka-topics.sh --create --topic topic-01 --bootstrap-server 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092 --partitions 3 --replication-factor 3 Created topic topic-01. [root@ecs-kafka bin]#
- 已开启密文接入的Kafka实例,通过以下步骤创建Topic。
- (可选)修改客户端配置文件。
在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,具体如下。
- SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
security.protocol=SASL_PLAINTEXT #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN
参数说明如下:username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。
- SASL_SSL:如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN
参数说明如下:
- ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
- ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。
- SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
- 在Kafka客户端的“/bin”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic ${topic-name} --bootstrap-server ${connection-address} --partitions ${number-of-partitions} --replication-factor ${number-of-replicas} --command-config ../config/ssl-user-config.properties
参数说明如下:
- topic-name:Topic名称,支持自定义。
- connection-address:在Kafka控制台的“基本信息 > 连接信息”中,获取Kafka实例的连接地址。
- number-of-partitions:Topic的分区数。出于性能考虑,建议单个Topic的分区数设置为200以内。
- number-of-replicas:Topic的副本数。
示例如下:
[root@ecs-kafka bin]# ./kafka-topics.sh --create --topic topic-01 --bootstrap-server 192.168.xx.xx:9093,192.168.xx.xx:9093,192.168.xx.xx:9093 --partitions 3 --replication-factor 3 --command-config ../config/ssl-user-config.properties Created topic topic-01. [root@ecs-kafka bin]#
- (可选)修改客户端配置文件。