High level KafkaStreams API Usage Sample
Function Description
The following code snippets are used in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:
Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.
Code Sample
static void createWordCountStream(final StreamsBuilder builder) {
// Receive the input records from input-topic.
final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);
// Aggregate the calculation result of the key-value pair.
final KTable<String, Long> counts = source
// Process the received records and split according to the regular expression REGEX_STRING.
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
// Aggregate the calculation result of the key-value pair.
.groupBy((key, value) -> value)
// The final calculation result
.count();
// Output the key-value pair of the calculation result from the output topic.
counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.