High level Streams API Sample Usage Guide

Updated on 2022-11-18 GMT+08:00
  1. Open sample code WordCountDemo.java in the IDE and change the following two variables to the machine-machine account name and keytab file that you apply for.
    // Use the machine-machine account keytab file that you apply for.
    private static final String USER_KEYTAB_FILE = "Change it to the real-world keytab file name.";
    // Use the machine-machine account name that you apply for.
    private static final String USER_PRINCIPAL = "Change it to the real-world username.";
  2. Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the sample code, set the policy for clearing the output topic to compact, and run the sample code.
    // source-topic name created by the user, that is, input topic
    private static final String INPUT_TOPIC_NAME = "streams-wordcount-input";
    // sink-topic name created by the user, that is, output topic
    private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-output";
    • Create the input topic:

      kafka-topics.sh --create --zookeeper,, --replication-factor 1 --partitions 1 --topic streams-wordcount-input

    • Create the output topic:

      kafka-topics.sh --create --zookeeper,, --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

    • Run sample code WordCountDemo.java. For details, see Commissioning an Application in Windows and Commissioning an Application in Linux.
  3. Use the Linux client to write messages and view the statistics result.

    Run the kafka-console-producer.sh command to write messages to the input topic.

    # kafka-console-producer.sh --broker-list,, --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
    >This is Kafka Streams test
    >test starting
    >now Kafka Streams is running
    >test end

    Run the kafka-console-consumer.sh command to consume data from the output topic and view the statistics result.

    # kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server,, --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter
    this    1
    is      6
    kafka   12
    streams 8
    test    8
    test    9
    starting 1
    now     1
    kafka   13
    streams 9
    is      7
    running 1
    test    10
    end     1




