Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

On this page

High level KafkaStreams API Usage Sample

Updated on 2022-09-14 GMT+08:00

Function Description

The following code snippets are used in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:

Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.

Code Sample

static void createWordCountStream(final StreamsBuilder builder) {
     // Receive the input records from input-topic.
     final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);

     // Aggregate the calculation result of the key-value pair. 
     final KTable<String, Long> counts = source
             // Process the received records and split according to the regular expression REGEX_STRING.
             .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
             // Aggregate the calculation result of the key-value pair. 
             .groupBy((key, value) -> value)
             // The final calculation result
             .count();

     // Output the key-value pair of the calculation result from the output topic.
     counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
 }
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback