使用旧插件storm-kafka时如何正确设置offset
问题
当前虽然默认推荐使用storm-kafka-client插件进行安全kafka对接,但仍然存在使用旧插件storm-kafka的用户和场景,在这种场景下如何正确指定消费的offset,避免每次重启拓扑后都从头开始消费?
回答
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; }
其中“hosts”是ZooKeeper的连接串,如:192.168.0.1:2181/kafka,“topic”是待消费的Topic名,“zkRoot”表示在ZooKeeper中的存放数据的根路径,一般为:“/kafka/{topic}”,“id”表示应用的标示,如:app1。读取offset会有以下两种场景:
- 场景1
当拓扑运行后,KafkaSpout会将offset存放在ZooKeeper路径:“/{zkRoot}/{id}/{partitionId}”下,其中“zkRoot”和“id”是用户指定的,“partitionId”是自动获取的。默认情况下,拓扑在启动后会先从ZooKeeper上的offset存放路径读取历史的offset,用作本次的消费起点,因此只需要正确的指定“zkRoot”和“id”,就可以继承历史记录的offset,不用从头开始消费。
- 场景2
没有像场景1中那样设置固定的“zkRoot”或者“id”,导致无法读取历史的offset,如此一来每次提交拓扑都会把历史已经消费过的数据再消费一遍,这时需要通过如下方式手动指定:
SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId); spoutConfig.ignoreZkOffsets = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
通过指定SpoutConfig中的“ignoreZkOffsets”和“startOffsetTime”来强制消费最新的数据。
在实际使用中推荐使用场景1中的方式,因为场景2中并非从上次commit成功的位置开始,因此可能会存在部分数据遗漏。