Updated on 2022-11-18 GMT+08:00

High level KafkaStreams API Usage Sample

Function Description

The following code snippets are used in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:

Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.

Code Sample

static void createWordCountStream(final StreamsBuilder builder) {
     // Receive the input records from input-topic.
     final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);

     // Aggregate the calculation result of the key-value pair. 
     final KTable<String, Long> counts = source
             // Process the received records and split according to the regular expression REGEX_STRING.
             .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
             // Aggregate the calculation result of the key-value pair. 
             .groupBy((key, value) -> value)
             // The final calculation result
             .count();

     // Output the key-value pair of the calculation result from the output topic.
     counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
 }