High Level Streams Sample Commissioning
Commissioning Applications on Windows
For how to debug applications on Windows, see Commissioning Applications on Windows.
Commissioning Applications on Linux
- Compile and generate a JAR package, and copy the JAR package to the src/main/resources directory at the same level as the dependent library folder. For details, see Commissioning Applications on Linux.
- Log in to the cluster client node as the cluster installation user.
cd /opt/client
source bigdata_env
kinit Component operation user (for example, developuser)
- Create an input topic and an output topic. Ensure that the topic names are the same as those specified in the sample code. Set the cleanup policy of the output topic to compact.
kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port/kafka --replication-factor 1 --partitions 1 --topic Topic name
To query the IP address of the quorumpeer instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > ZooKeeper, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can obtain the ZooKeeper client connection port by querying the ZooKeeper configuration parameter clientPort. The default value is 2181.
Run the following commands:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact
- Run the following command to run the application after the topics are created:
java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.WordCountDemo
- Open a new client window and run the following commands to use kafka-console-producer.sh to write messages to the input topic:
cd /opt/client
source bigdata_env
kinit Component operation user (for example, developuser)
kafka-console-producer.sh --broker-list IP address of the broker instance:Kafka connection port(for example, 192.168.0.13:9092) --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
- Open a new client window and run the following commands to use kafka-console-consumer.sh to consume data from the output topic and view the result:
cd /opt/client
source bigdata_env
kinit Component operation user (for example, developuser)
kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server IP address of the broker instance:Kafka connection port --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter
Write a message to the input topic.
>This is Kafka Streams test >test starting >now Kafka Streams is running >test end
The output is as follows:
this 1 is 1 kafka 1 streams 1 test 1 test 2 starting 1 now 1 kafka 2 streams 2 is 2 running 1 test 3 end 1
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot