规则
调用Kafka API(AdminUtils.createTopic)创建Topic时,需要设置ZkStringSerializer
- 对于Java开发语言,正确示例:
import org.I0Itec.zkclient.ZkClient; import kafka.utils.ZKStringSerializer$; … ZkClient zkClient = new ZkClient(zkconnectstring, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer$.MODULE$); AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); …
- 对于Scala开发语言,正确示例:
import org.I0Itec.zkclient.ZkClient; import kafka.utils.ZKStringSerializer; … var zkclient: ZkClient = new ZkClient(zkconnectstring, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties())
Partition的副本数不要超过节点个数
Kafka中Topic的Partition的副本是为了提升数据的可靠性而存在的,同一个Partition的副本会分布在不同的节点,因此副本数不允许超过节点个数。
Consumer客户端的配置参数“fetch.message.max.bytes”大小
Consumer客户端的配置参数“fetch.message.max.bytes”必须大于等于Producer客户端每次产生的消息最大字节数。如果参数的值太小,可能导致Producer产生的消息无法被Consumer成功消费。