Updated on 2022-11-18 GMT+08:00

High Level Kafka Streams API Sample Usage Guide

  1. Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the sample code, set the policy for clearing the output topic to compact, and run the sample code.
    // source-topic name created by the user, that is, input topic
    private static final String INPUT_TOPIC_NAME = "streams-wordcount-input";
    // sink-topic name created by the user, that is, output topic
    private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-output";
    • Create the input topic:

      kafka-topics.sh --create --zookeeper 192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

    • Create the output topic:

      kafka-topics.sh --create --zookeeper 192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

    • Run sample code WordCountDemo.java. For details, seeCommissioning an Application in WindowsandCommissioning an Application in Linux.
  2. Use the Linux client to write messages and view the statistics result.

    Run the kafka-console-producer.sh command to write messages to the input topic.

    # kafka-console-producer.sh --broker-list 192.168.0.11:21005,192.168.0.12:21005,192.168.0.13:21005 --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
    >This is Kafka Streams test
    >test starting
    >now Kafka Streams is running
    >test end
    >

    Run the kafka-console-consumer.sh command to consume data from the output topic and view the statistics result.

    # kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server 192.168.0.11:21005,192.168.0.12:21005,192.168.0.13:21005 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter
    this    1
    is      6
    kafka   12
    streams 8
    test    8
    test    9
    starting 1
    now     1
    kafka   13
    streams 9
    is      7
    running 1
    test    10
    end     1