Cette page n'est pas encore disponible dans votre langue. Nous nous efforçons d'ajouter d'autres langues. Nous vous remercions de votre compréhension.

On this page

High level KafkaStreams API Usage Sample

Updated on 2022-09-14 GMT+08:00

Function Description

The following code snippets are used in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:

Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.

Code Sample

static void createWordCountStream(final StreamsBuilder builder) {
     // Receive the input records from input-topic.
     final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);

     // Aggregate the calculation result of the key-value pair. 
     final KTable<String, Long> counts = source
             // Process the received records and split according to the regular expression REGEX_STRING.
             .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
             // Aggregate the calculation result of the key-value pair. 
             .groupBy((key, value) -> value)
             // The final calculation result
             .count();

     // Output the key-value pair of the calculation result from the output topic.
     counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
 }
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback