Overview
Scenarios
Assume that the Kafka component receives one word record every 1 second.
The developed Spark application needs to achieve the following function:
Calculate the sum of records for each word in real time.
log1.txt example file:
LiuYang YuanJing GuoYijun CaiXuyu Liyuan FangBo LiuYang YuanJing GuoYijun CaiXuyu FangBo
Data Planning
- Ensure that the cluster, including HDFS, Yarn, Spark, and Kafka is successfully installed.
- Create the input_data1.txt file in the local and copy the content of the log1.txt file to the input_data1.txt file.
On the client installation node, create the /home/data directory and upload the input_data1.txt file to the /home/data directory.
- Create a topic.
{zkQuorum} indicates ZooKeeper cluster information. The format is IP:port.
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}
- Start the Producer of Kafka and send data to Kafka.
java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}
In this command, ClassPath must contain the absolute path of the Kafka JAR package on the Spark client in addition to the path of the sample JAR package, for example: /opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}.
Development Approach
- Receive data from Kafka and generate DStream.
- Collect the statistics of word records by category.
- Calculate and print the result.
Packaging the Project
- Use the Maven tool provided by IDEA to pack the project and generate a JAR file. For details, see Compiling and Running the Application.
- Upload the JAR package to any directory (for example, /opt) on the server where the Spark client is located.
- Prepare dependency packages and upload the following JAR packages to the $SPARK_HOME/jars/streamingClient010 directory on the server where the Spark client is located.
- spark-streaming-kafkaWriter-0-10_2.12-3.1.1-hw-ei-311001.jar
- kafka-clients-xxx.jar
- spark-sql-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
- spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
- spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
![](https://support.huaweicloud.com/eu/devg3-mrs/public_sys-resources/note_3.0-en-us.png)
- For the dependency package whose version number contains "hw-ei", download from the Huawei open-source image site. For details, see Configuring Huawei Open-Source Mirrors.
- For the dependency package whose version number does not contain "hw-ei", obtain them from the Maven central repository. For details, see Obtaining Sample Projects.
Running Tasks
When running the sample program, you need to specify <checkpointDir>, <brokers>, <topic>, and <batchTime>. <checkPointDir> indicates the path for storing the program result backup in HDFS. <brokers> indicates the Kafka address for obtaining metadata. <topic> indicates the topic name read from Kafka. <batchTime> indicates the interval for Streaming processing in batches.
![](https://support.huaweicloud.com/eu/devg3-mrs/public_sys-resources/note_3.0-en-us.png)
The path of Spark Streaming's Kafka dependency package on the client is different from that of other dependency packages. For example, the path of other dependency packages is $SPARK_HOME/jars, and the path of the Kafka dependency package is $SPARK_HOME/jars/streamingClient010. Therefore, when running an application, you need to add a configuration item to the spark-submit command to specify the path of the dependency package of Spark Streaming Kafka, for example, --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}").
- Sample code (Spark Streaming read Kafka 0-10 Write To Print)
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
- Sample code (Spark Streaming Write To Kafka 0-10)
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter /opt/SparkStreamingKafka010JavaExample-1.0.jar <groupId> <brokers> <topics>
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.