Low level KafkaStreams API Usage Sample
Function Description
The following code snippets are used in the com.huawei.bigdata.kafka.example.WordCountProcessorDemo class to implement the following function:
Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.
Code Sample
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
// ProcessorContext instance, which provides the access of the metadata of the records being processed
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// Save processor context in the local host, because it will be used for punctuate() and commit().
this.context = context;
// Execute punctuate() once every second.
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
// Send the new records to the downstream processor as key-value pairs.
context.forward(entry.key, entry.value.toString());
}
}
});
// Search for the key-value states storage area named KEY_VALUE_STATE_STORE_NAME to memorize the recently received input records.
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME);
}
// Process the receiving records of input topic. Split the records into words, and count the words.
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING);
for (String word : words) {
Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void close() {
}
};
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.