Updated on 2022-11-18 GMT+08:00

Low level Kafka Streams API Sample Usage Guide

  1. Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the sample code, set the policy for clearing the output topic to compact, and run the sample code.
    // source-topic name created by the user, that is, input topic 
    private static final String INPUT_TOPIC_NAME = "streams-wordcount-processor-input";
    
    // sink-topic name created by the user, that is, output topic 
    private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-processor-output";
    • Create the input topic:

      kafka-topics.sh --create --zookeeper 192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-input

    • Create the output topic:

      kafka-topics.sh --create --zookeeper 192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-output--config cleanup.policy=compact

    • Run sample code WordCountProcessorDemo.java. For details, seeCommissioning an Application in WindowsandCommissioning an Application in Linux.
  2. Use the Linux client to write messages and view the statistics result.

    Run the kafka-console-producer.sh command to write messages to the input topic.

    # kafka-console-producer.sh --broker-list 192.168.0.11:9092,192.168.0.12:9092,192.168.0.13:9092 --topic streams-wordcount-processor-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
    >This is Kafka Streams test
    >now Kafka Streams is running
    >test end
    >

    Run the kafka-console-consumer.sh command to consume data from the output topic and view the statistics result.

    # kafka-console-consumer.sh --topic streams-wordcount-processor-output --bootstrap-server 192.168.0.11:9092,192.168.0.12:9092,192.168.0.13:9092 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true
    is      1
    kafka   1
    streams 1
    test    1
    this    1
    end     1
    is      2
    kafka   2
    now     1
    running 1
    streams 2
    test    2
    this    1