How Do I Set the Offset Correctly when Using the Old Plug-in storm-kafka?
Question
Currently, storm-kafka-client plug-in is recommended by default to achieve a safe connection with Kafka. However, the old plug-in storm-kafka is still preferred for some users and scenarios. This section describes how to correctly specify the to-be-consumed offset to avoid consuming from start point again when topology is restarted.
Answer
KafkaSpout in the old plug-in storm-kafka uses the SimpleConsumer interface of Kafka. The offset needs to be managed independently. The KafkaSpout stores the offset of each Partition in the Topic in ZooKeeper based on user-defined fields. The definition is as follows:
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; }
In this definition, hosts indicates a connection string of ZooKeeper, such as 192.168.0.1:2181/kafka. topic indicates the name of the topic to be consumed. zkRoot indicates the root path for storing data in ZooKeeper, which is generally /kafka/ {topic}. id indicates the application ID, such as: app1. There are two scenarios where offset is read:
- Scenario 1
When the topology is run, the KafkaSpout stores the offset in following path of ZooKeeper: / {zkRoot} / {id} / {partitionId}. zkRoot and id are specified by the user, and partitionId is automatically obtained. By default, after the topology is started, the history offset is read from the offset storage path on ZooKeeper and is used as the start point of the current consumption. Therefore, zkRoot and id avoid consuming from start point again.
- Scenario 2
There is no such fixed zkRoot or id as set in scenario 1. As a result, the history offset cannot be read. This way, history data is re-consumed every time the topology is submitted. In this case, you can manually specify zkRoot or id using the following manner:
SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId); spoutConfig.ignoreZkOffsets = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
You can specify ignoreZkOffsets and startOffsetTime in SpoutConfig to enable latest data to be forcibly consumed.
In practice, you are advised to use the method in scenario 1. In scenario 2, consumption does not start from the point when the commitment is successful. As a result, some data may be missing.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot