更新时间:2022-09-08 GMT+08:00
分享

使用旧插件storm-kafka时如何正确设置offset

问题

当前虽然默认推荐使用storm-kafka-client插件进行安全kafka对接,但仍然存在使用旧插件storm-kafka的用户和场景,在这种场景下如何正确指定消费的offset,避免每次重启拓扑后都从头开始消费?

回答

旧插件storm-kafka中的KafkaSpout使用的是Kafka的“SimpleConsumer”接口,需要自主管理offset,KafkaSpout中根据用户定义的字段将Topic中每个Patition的offset记录在ZooKeeper中,定义如下:
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
    super(hosts, topic);
    this.zkRoot = zkRoot;
    this.id = id;
}

其中“hosts”是ZooKeeper的连接串,如:192.168.0.1:2181/kafka,“topic”是待消费的Topic名,“zkRoot”表示在ZooKeeper中的存放数据的根路径,一般为:“/kafka/{topic}”“id”表示应用的标示,如:app1。读取offset会有以下两种场景:

  • 场景1

    当拓扑运行后,KafkaSpout会将offset存放在ZooKeeper路径:“/{zkRoot}/{id}/{partitionId}”下,其中“zkRoot”“id”是用户指定的,“partitionId”是自动获取的。默认情况下,拓扑在启动后会先从ZooKeeper上的offset存放路径读取历史的offset,用作本次的消费起点,因此只需要正确的指定“zkRoot”“id”,就可以继承历史记录的offset,不用从头开始消费。

  • 场景2

    没有像场景1中那样设置固定的“zkRoot”或者“id”,导致无法读取历史的offset,如此一来每次提交拓扑都会把历史已经消费过的数据再消费一遍,这时需要通过如下方式手动指定:

    SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId);
    spoutConfig.ignoreZkOffsets = true;
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    通过指定SpoutConfig中的“ignoreZkOffsets”“startOffsetTime”来强制消费最新的数据。

在实际使用中推荐使用场景1中的方式,因为场景2中并非从上次commit成功的位置开始,因此可能会存在部分数据遗漏。

分享:

    相关文档

    相关产品