- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 如何修改安全协议?
- 修改企业项目,是否会导致Kafka重启?
- 100MB/s的带宽怎样开启公网访问?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka包周期实例支持删除吗?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 怎样重新绑定公网IP?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
- Topic和分区问题
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的账号是否可以设置为只读账号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 客户端删除消费组后,在Kafka Manager中仍可以看到此消费组?
- 监控告警问题
- Kafka体验版使用说明
-
实例问题
- 故障排除
- 视频帮助
- 文档下载
- 通用参考
展开导读
链接复制成功!
Kafka客户端参数配置建议
Kafka客户端的配置参数很多,以下提供Producer和Consumer几个常用参数配置。不同版本的Kafka客户端参数名称可能不同,以下配置参数适用于1.1.0及以上版本。其他参数和版本配置,请参考Kafka配置。
参数 |
默认值 |
推荐值 |
说明 |
---|---|---|---|
acks |
1 |
高可靠:all或者-1 高吞吐:1 |
收到Server端确认信号个数,表示producer需要收到多少个这样的确认信号,算消息发送成功。acks参数代表了数据备份的可用性。常用选项: acks=0:表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1。 acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。 acks=all或者-1:这意味着leader需要等待ISR中所有备份都成功写入日志。只要任何一个备份存活,数据都不会丢失。min.insync.replicas指定必须确认写入才能被认为成功的副本的最小数量。 |
retries |
0 |
结合实际业务调整 |
客户端发送消息的重试次数。值大于0时,这些数据发送失败后,客户端会重新发送。 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 针对网络闪断场景,生产者建议配置重试能力,推荐重试次数retries=3,重试间隔retry.backoff.ms=1000。 |
request.timeout.ms |
30000 |
结合实际业务调整 |
设置一个请求最大等待时间(单位为ms),超过这个时间则会抛Timeout异常。 超时时间如果设置大一些,如127000(127秒),高并发的场景中,能减少发送失败的情况。 |
block.on.buffer.full |
TRUE |
TRUE |
TRUE表示当我们内存用尽时,停止接收新消息记录或者抛出错误。 默认情况下,这个设置为TRUE。然而某些阻塞可能不值得期待,因此立即抛出错误更好。如果设置为false,则producer抛出一个异常错误:BufferExhaustedException |
batch.size |
16384 |
262144 |
默认的批量处理消息字节数上限。producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。不会试图处理大于这个字节数的消息字节数。 发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。 较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。 |
buffer.memory |
33554432 |
67108864 |
producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。 这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 |
enable.idempotence |
|
如果无幂等需求,建议设置为false。 |
在生产者客户端中开启幂等,然后生产消息,此时您会在消费者客户端或Kafka控制台的“消息查询”中观察到消息offset不连续的现象。这是因为开启了幂等后,在生产消息时会产生一些元数据控制消息,这些控制消息也会生产到该Topic中, 且它们对消费者不可见,从而造成offset不连续的现象。 |
参数 |
默认值 |
推荐值 |
说明 |
---|---|---|---|
auto.commit.enable |
TRUE |
FALSE |
如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程无法提供服务时,由新的consumer使用。 约束: 设置为false后,需要先成功消费再提交,这样可以避免消息丢失。 |
auto.offset.reset |
latest |
earliest |
没有初始化offset或者offset被删除时,可以设置以下值:
如果将此配置设置为latest,新增分区时,生产者可能会在消费者重置初始偏移量之前开始向新增加的分区发送消息,从而导致部分消息丢失。 |
connections.max.idle.ms |
600000 |
30000 |
空连接的超时时间(单位为ms),设置为30000可以在网络异常场景下减少请求卡顿的时间。 |
max.poll.records |
500 |
需小于max.poll.interval.ms时间内的消费处理能力。 |
消费者单次从Broker中拉取的最大消息条数。 |
max.poll.interval.ms |
300000 |
如果在两次poll之间存在复杂、耗时的逻辑,需要延长该参数值。 |
消费者两次拉取消息的最大时间间隔,单位为ms。如果两次拉取消息的时间间隔超过该间隔时间,就视为此次消费失败,消费者被踢出消费组,触发Rebalance。 |
heartbeat.interval.ms |
3000 |
>=3000 |
消费者和Kafka之间的心跳间隔,单位为ms。 |
session.timeout.ms |
10000 |
将该参数值设置为heartbeat.interval.ms的3倍以上。 |
使用消费组管理offset时,消费者与Broker的会话超时时间,单位为ms。 |
fetch.max.bytes |
1000000 |
max.request.size < message.max.bytes < fetch.max.bytes |
消费者单次从Broker中拉取消息的最大字节数。 |