High level KafkaStreams API Usage Sample
Function Description
The following code snippets are used in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:
Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.
Code Sample
static void createWordCountStream(final StreamsBuilder builder) { // Receive the input records from input-topic. final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME); // Aggregate the calculation result of the key-value pair. final KTable<String, Long> counts = source // Process the received records and split according to the regular expression REGEX_STRING. .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING))) // Aggregate the calculation result of the key-value pair. .groupBy((key, value) -> value) // The final calculation result .count(); // Output the key-value pair of the calculation result from the output topic. counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long())); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot