Overview

Updated on 2022-11-18 GMT+08:00

Scenarios

Assume that the Kafka component receives one word record every 1 second.

The developed Spark application needs to achieve the following function:

Calculate the sum of records for each word in real time.

log1.txt example file:

LiuYang
YuanJing
GuoYijun
CaiXuyu
Liyuan
FangBo
LiuYang
YuanJing
GuoYijun
CaiXuyu
FangBo

Data Planning

Spark Streaming sample project data is stored in the Kafka component. A user with Kafka permission sends data to Kafka component.
  1. Ensure that the cluster, including HDFS, Yarn, Spark, and Kafka is successfully installed.
  2. Create the input_data1.txt file in the local and copy the content of the log1.txt file to the input_data1.txt file.

    On the client installation node, create the /home/data directory and upload the input_data1.txt file to the /home/data directory.

  3. Create a topic.

    {zkQuorum} indicates ZooKeeper cluster information. The format is IP:port.

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

  4. Start the Producer of Kafka and send data to Kafka.

    java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}

    In this command, ClassPath must contain the absolute path of the Kafka JAR package on the Spark client in addition to the path of the sample JAR package, for example: /opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}.

Development Approach

  1. Receive data from Kafka and generate DStream.
  2. Collect the statistics of word records by category.
  3. Calculate and print the result.

Packaging the Project

  • Use the Maven tool provided by IDEA to pack the project and generate a JAR file. For details, see Compiling and Running the Application.
  • Upload the JAR package to any directory (for example, /opt) on the server where the Spark client is located.
  • Prepare dependency packages and upload the following JAR packages to the $SPARK_HOME/jars/streamingClient010 directory on the server where the Spark client is located.
    • spark-streaming-kafkaWriter-0-10_2.12-3.1.1-hw-ei-311001.jar
    • kafka-clients-xxx.jar
    • spark-sql-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
    • spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
    • spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
NOTE:
  • For the dependency package whose version number contains "hw-ei", download from the Huawei open-source image site. For details, see Configuring Huawei Open-Source Mirrors.
  • For the dependency package whose version number does not contain "hw-ei", obtain them from the Maven central repository. For details, see Obtaining Sample Projects.

Running Tasks

When running the sample program, you need to specify <checkpointDir>, <brokers>, <topic>, and <batchTime>. <checkPointDir> indicates the path for storing the program result backup in HDFS. <brokers> indicates the Kafka address for obtaining metadata. <topic> indicates the topic name read from Kafka. <batchTime> indicates the interval for Streaming processing in batches.

NOTE:

The path of Spark Streaming's Kafka dependency package on the client is different from that of other dependency packages. For example, the path of other dependency packages is $SPARK_HOME/jars, and the path of the Kafka dependency package is $SPARK_HOME/jars/streamingClient010. Therefore, when running an application, you need to add a configuration item to the spark-submit command to specify the path of the dependency package of Spark Streaming Kafka, for example, --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}").

Go to the Spark client directory and run the following commands to invoke the bin/spark-submit script to run the code (The class name and file name must be the same as those in the actual code. The following is only an example.):
  • Sample code (Spark Streaming read Kafka 0-10 Write To Print)

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

  • Sample code (Spark Streaming Write To Kafka 0-10)

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter /opt/SparkStreamingKafka010JavaExample-1.0.jar <groupId> <brokers> <topics>

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback