High level KafkaStreams API Usage Sample
Function Description
The following code snippets are used in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:
Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.
Code Sample
static void createWordCountStream(final StreamsBuilder builder) { // Receive the input records from input-topic. final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME); // Aggregate the calculation result of the key-value pair. final KTable<String, Long> counts = source // Process the received records and split according to the regular expression REGEX_STRING. .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING))) // Aggregate the calculation result of the key-value pair. .groupBy((key, value) -> value) // The final calculation result .count(); // Output the key-value pair of the calculation result from the output topic. counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long())); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.