Low level KafkaStreams API Usage Sample
Function Description
The following code snippets are used in the com.huawei.bigdata.kafka.example.WordCountProcessorDemo class to implement the following function:
Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.
Code Sample
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { @Override public Processor<String, String> get() { return new Processor<String, String>() { // ProcessorContext instance, which provides the access of the metadata of the records being processed private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { // Save processor context in the local host, because it will be used for punctuate() and commit(). this.context = context; // Execute punctuate() once every second. this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { System.out.println("----------- " + timestamp + " ----------- "); while (iter.hasNext()) { final KeyValue<String, Integer> entry = iter.next(); System.out.println("[" + entry.key + ", " + entry.value + "]"); // Send the new records to the downstream processor as key-value pairs. context.forward(entry.key, entry.value.toString()); } } }); // Search for the key-value states storage area named KEY_VALUE_STATE_STORE_NAME to memorize the recently received input records. this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME); } // Process the receiving records of input topic. Split the records into words, and count the words. @Override public void process(String dummy, String line) { String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } } @Override public void close() { } }; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot