Low Level Streams Sample Commissioning
Commissioning Applications on Windows
For how to debug applications on Windows, see Commissioning Applications on Windows.
Commissioning Applications on Linux
- Compile and generate a JAR package, and copy the JAR package to the src/main/resources directory at the same level as the dependent library folder. For details, see Commissioning Applications on Linux.
- Log in to the node where the cluster client is installed as user root.
cd /opt/client
source bigdata_env
- Create an input topic and an output topic. Ensure that the topic names are the same as those specified in the sample code. Set the cleanup policy of the output topic to compact.
kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port/kafka --replication-factor 1 --partitions 1 --topic Topic name
To query the IP address of the quorumpeer instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > ZooKeeper, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can obtain the ZooKeeper client connection port by querying the ZooKeeper configuration parameter clientPort. For example, the port number is 2181.
Run the following commands:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-input
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-output --config cleanup.policy=compact
- Run the following command to run the application after the topics are created:
java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.WordCountDemo
- Open a new client window and run the following commands to use kafka-console-producer.sh to write messages to the input topic:
cd /opt/client
source bigdata_env
kafka-console-producer.sh --broker-list IP address of the broker instance:Kafka connection port --topic streams-wordcount-processor-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
- IP address of the broker instance: Log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instance, and query the instance IP address on the instance list page.
- Kafka connection port: If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the broker port number is the value of port.
- Open a new client window and run the following commands to use kafka-console-consumer.sh to consume data from the output topic and view the result:
cd /opt/client
source bigdata_env
kafka-console-consumer.sh --topic streams-wordcount-processor-output --bootstrap-server IP address of the broker instance:Kafka connection port --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true
Write a message to the input topic.
>This is Kafka Streams test >test starting >now Kafka Streams is running >test end
The output is as follows:
this 1 is 1 kafka 1 streams 1 test 1 test 2 starting 1 now 1 kafka 2 streams 2 is 2 running 1 test 3 end 1
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot