Low level KafkaStreams API Usage Sample
Function Description
The following code snippets are used in the com.huawei.bigdata.kafka.example.WordCountProcessorDemo class to implement the following function:
Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.
Code Sample
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { @Override public Processor<String, String> get() { return new Processor<String, String>() { // ProcessorContext instance, which provides the access of the metadata of the records being processed private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { // Save processor context in the local host, because it will be used for punctuate() and commit(). this.context = context; // Execute punctuate() once every second. this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { System.out.println("----------- " + timestamp + " ----------- "); while (iter.hasNext()) { final KeyValue<String, Integer> entry = iter.next(); System.out.println("[" + entry.key + ", " + entry.value + "]"); // Send the new records to the downstream processor as key-value pairs. context.forward(entry.key, entry.value.toString()); } } }); // Search for the key-value states storage area named KEY_VALUE_STATE_STORE_NAME to memorize the recently received input records. this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME); } // Process the receiving records of input topic. Split the records into words, and count the words. @Override public void process(String dummy, String line) { String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } } @Override public void close() { } }; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.