Updated on 2024-08-10 GMT+08:00

Low level Streams API Sample Usage Guide

Commissioning Applications on Windows

For how to debug applications on Windows, see Commissioning Applications on Windows.

Commissioning Applications on Linux

  1. Compile and generate a JAR package, and copy the JAR package to the src/main/resources directory at the same level as the dependent library folder. For details, see Commissioning Applications on Linux.
  2. Log in to the node where the cluster client is installed as user root.

    cd /opt/client

    source bigdata_env

    kinit Component operation user (for example, developuser)

  3. Create an input topic and an output topic. Ensure that the topic names are the same as those specified in the sample code. Set the cleanup policy of the output topic to compact.

    kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port/kafka --replication-factor 1 --partitions 1 --topic Topic name

    To query the IP address of the quorumpeer instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > ZooKeeper, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can obtain the ZooKeeper client connection port by querying the ZooKeeper configuration parameter clientPort. For example, the port number is 2181.

    Run the following commands:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-input

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-output --config cleanup.policy=compact

  4. Run the following command to run the application after the topics are created:

    java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.WordCountDemo

  5. Open a new client window and run the following commands to use kafka-console-producer.sh to write messages to the input topic:

    cd /opt/client

    source bigdata_env

    kinit Component operation user (for example, developuser)

    kafka-console-producer.sh --broker-list IP address of the broker instance:Kafka connection port --topic streams-wordcount-processor-input --producer.config /opt/client/Kafka/kafka/config/producer.properties

    • IP address of the broker instance: Log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instance, and query the instance IP address on the instance list page.
    • Kafka connection port: If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the broker port number is the value of port.

  6. Open a new client window and run the following commands to use kafka-console-consumer.sh to consume data from the output topic and view the result:

    cd /opt/client

    source bigdata_env

    kinit Component operation user (for example, developuser)

    kafka-console-consumer.sh --topic streams-wordcount-processor-output --bootstrap-server IP address of the broker instance:Kafka connection port --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true

    Write a message to the input topic.

    >This is Kafka Streams test 
    >test starting 
    >now Kafka Streams is running 
    >test end 

    The output is as follows:

    this    1 
    is      1 
    kafka   1 
    streams 1 
    test    1 
    test    2 
    starting 1 
    now     1 
    kafka   2 
    streams 2 
    is      2 
    running 1 
    test    3 
    end     1