Updated on 2024-08-10 GMT+08:00

KafkaStreams Sample

Function

The section describes High Level Kafka Streams API and Low Level Kafka Streams API sample codes. Kafka Streams counts words in each message by reading messages in the input topic and outputs the result in key-value pairs by consuming data in the output topic.

High Level Kafka Streams API Sample Code

The following code snippets are in the createWordCountStream method of the com.huawei.bigdata.kafka.example.WordCountDemo class.
 static void createWordCountStream(final StreamsBuilder builder) {
     // Receive input records from the input topic.
     final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);

     // Aggregate the calculation results of the key-value pairs.
     final KTable<String, Long> counts = source
             // Process the received records and split them based on the regular expression REGEX_STRING.
             .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
             // Aggregate the calculation results of the key-value pairs.
             .groupBy((key, value) -> value)
             // Output the final result.
             .count();

     // Output the key-value pairs from the output topic.
     counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
 }
// keytab file name of the machine-machine account that a user applies for
private static final String USER_KEYTAB_FILE = "Change it to the real-world keytab file name.";
// Machine-machine account that a user applies for
private static final String USER_PRINCIPAL = "Change it to the real-world username."

Low Level Kafka Streams API Sample Code

The following code snippets are in the com.huawei.bigdata.kafka.example.WordCountProcessorDemo class.
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
    @Override
    public Processor<String, String> get() {
        return new Processor<String, String>() {
            // ProcessorContext instance, which provides access to the metadata of the record being processed.
            private ProcessorContext context;
            private KeyValueStore<String, Integer> kvStore;

            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
                // Retain the processor context on the local PC because it will be used in punctuate() and commit().
                this.context = context;
                // Execute punctuate() every second.
                this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
                    try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                        System.out.println("----------- " + timestamp + " ----------- ");
                        while (iter.hasNext()) {
                            final KeyValue<String, Integer> entry = iter.next();
                            System.out.println("[" + entry.key + ", " + entry.value + "]");
                            // Send the new record as a key-value pair to the downstream processor.
                            context.forward(entry.key, entry.value.toString());
                        }
                    }
                });
                // Search for the key-value state storage zone KEY_VALUE_STATE_STORE_NAME, which can be used to memorize the recently received input records.
                this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME);
            }

            // Process the received records of the input topic, split the records into words, and count the words.
            @Override
            public void process(String dummy, String line) {
                String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING);

                for (String word : words) {
                    Integer oldValue = this.kvStore.get(word);

                    if (oldValue == null) {
                        this.kvStore.put(word, 1);
                    } else {
                        this.kvStore.put(word, oldValue + 1);
                    }
                }
            }

            @Override
            public void close() {
            }
        };
    }
}

// keytab file name of the machine-machine account that a user applies for
private static final String USER_KEYTAB_FILE = "Change it to the real-world keytab file name.";
// Machine-machine account that a user applies for
private static final String USER_PRINCIPAL = "Change it to the real-world username."