Consuming Kafka Data Using Spark Streaming Jobs
Application Scenarios
Use an MRS cluster to run Spark Streaming jobs to consume Kafka data.
Assume that Kafka receives one word record every second in a service. The Spark applications developed based on service requirements implements the function of accumulating the total number of records of each word in real time.
Spark Streaming sample projects store data in and send data to Kafka.
Solution Architecture
Spark is a distributed batch processing framework. It provides analysis and mining and iterative memory computing capabilities and supports application development in multiple programming languages, including Scala, Java, and Python. Spark applies to the following scenarios:
- Data processing: Spark can process data quickly and has fault tolerance and scalability.
- Iterative computation: Spark supports iterative computation to meet the requirements of multi-step data processing logic.
- Data mining: Based on massive data, Spark can handle complex data mining and analysis and support multiple data mining and machine learning algorithms.
- Streaming processing: Spark supports streaming processing with a seconds-level delay and supports multiple external data sources.
- Query analysis: Spark supports standard SQL query analysis, provides the DSL (DataFrame), and supports multiple external inputs.
Spark Streaming is a real-time computing framework built on the Spark, which expands the capability for processing massive streaming data. Spark supports two data processing approaches: Direct Streaming and Receiver.
In Direct Streaming approach, Direct API is used to process data. Take Kafka Direct API as an example. Direct API provides offset location that each batch range will read from, which is much simpler than starting a receiver to continuously receive data from Kafka and written data to WALs. Then, each batch job is running and the corresponding offset data is ready in Kafka. These offset information can be securely stored in the checkpoint file and read by applications that failed to start.
After the failure, Spark Streaming can read data from Kafka again and process the data segment. The processing result is the same no matter Spark Streaming fails or not, because the semantic is processed only once.
Direct API does not need to use the WAL and Receivers, and ensures that each Kafka record is received only once, which is more efficient. In this way, the Spark Streaming and Kafka can be well integrated, making streaming channels be featured with high fault-tolerance, high efficiency, and ease-of-use. Therefore, you are advised to use Direct Streaming to process data.
When a Spark Streaming application starts (that is, when the driver starts), the related StreamingContext (the basis of all streaming functions) uses SparkContext to start the receiver to become a long-term running task. Receiver receives and stores streaming data to the Spark memory for processing. Figure 2 shows the data transfer lifecycle.
- Receive data (blue arrow).
Receiver divides a data stream into a series of blocks and stores them in the executor memory. In addition, after WAL is enabled, it writes data to the WAL of the fault-tolerant file system.
- Notify the driver (green arrow).
The metadata in the received block is sent to StreamingContext in the driver. The metadata includes:
- Block reference ID used to locate the data position in the Executor memory.
- Block data offset information in logs (if the WAL function is enabled).
- Process data (red arrow).
For each batch of data, StreamingContext uses block information to generate resilient distributed datasets (RDDs) and jobs. StreamingContext executes jobs by running tasks to process blocks in the executor memory.
- Periodically set checkpoints (orange arrows).
- For fault tolerance, StreamingContext periodically sets checkpoints and saves them to external file systems.
Procedure
Huawei Cloud MRS provides sample development projects for Spark in multiple scenarios. The development guideline for the scenario in this practice is as follows:
- Receive data from Kafka and generate the corresponding DStream.
- Classify word records.
- Compute the result and print it.
Step 1: Creating an MRS Cluster
- Create and purchase an MRS cluster that contains the Spark2x and Kafka components. For details, see Buying a Custom Cluster.
In this practice, an MRS 3.1.0 cluster with Kerberos authentication disabled is used as an example.
- After the cluster is purchased, install the cluster client on any node of the cluster. For details, see Installing and Using the Cluster Client.
Assume that the client is installed in /opt/client.
Step 2: Preparing Applications
- Obtain the sample project from Huawei Mirrors.
Download the Maven project source code and configuration files of the sample project, and configure related development tools on the local host. For details, see Obtaining Sample Projects from Huawei Mirrors.
Select a sample project based on the cluster version and download the sample project.
For example, to obtain SparkStreamingKafka010JavaExample, visit https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/spark-examples/sparknormal-examples/SparkStreamingKafka010JavaExample.
- Use the IDEA tool to import the sample project and wait for the Maven project to download the dependency package. For details, see Configuring and Importing Sample Projects.
In this example project, Streaming is used to call the Kafka API to obtain word records, and word records are classified to obtain the number of records of each word. The key code snippets are as follows:
public class StreamingExampleProducer { public static void main(String[] args) throws IOException { if (args.length < 2) { printUsage(); } String brokerList = args[0]; String topic = args[1]; String filePath = "/home/data/"; //Path for obtaining the source data Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int m = 0; m < Integer.MAX_VALUE / 2; m++) { File dir = new File(filePath); File[] files = dir.listFiles(); if (files != null) { for (File file : files) { if (file.isDirectory()) { System.out.println(file.getName() + "This is a directory!"); } else { BufferedReader reader = null; reader = new BufferedReader(new FileReader(filePath + file.getName())); String tempString = null; while ((tempString = reader.readLine()) != null) { // Blank line judgment if (!tempString.isEmpty()) { producer.send(new ProducerRecord<String, String>(topic, tempString)); } } // make sure the streams are closed finally. reader.close(); } } } try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void printUsage() { System.out.println("Usage: {brokerList} {topic}"); } }
- After Maven and SDK parameters are configured on the local host, the sample project automatically loads related dependency packages. After the loading is complete, double-click package to obtain the JAR file.
For example, the packaged JAR file is SparkStreamingKafka010JavaExample-1.0.jar.
Step 3: Uploading the JAR Package and Source Data
- Prepare the source data to be sent to Kafka, for example, the following input_data.txt file. Upload the file to the /home/data directory on the client node.
ZhangSan LiSi WangwWU Tom Jemmmy LinDa
- Upload the compiled JAR package to a directory, for example, /opt, on the client node.
If you cannot directly connect to the client node to upload files through the local network, upload the JAR file or source data to OBS, import the file to HDFS on the Files tab page of the MRS cluster, and run the hdfs dfs -get command on the HDFS client to download the file to the client node.
Step 4: Running the Job and Viewing the Result
- Log in to the node where the cluster client is installed as user root.
cd /opt/client
source bigdata_env
- Create a Kafka topic for receiving data.
kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port /kafka --replication-factor 2 --partitions 3 --topic Topic name
To query the IP address of the quorumpeer instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > ZooKeeper, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can query the ZooKeeper client connection port by querying the ZooKeeper service configuration parameter clientPort. The default value is 2181.
For example, run the following command:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka
Created topic sparkkafka.
- After the topic is created, execute the program to send data to Kafka.
java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer IP address of the Broker instance:Kafka connection port Topic name
To query the IP address of the Kafka Broker instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > Kafka, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can query the Kafka connection port by querying the Kafka service configuration parameter port. The default value is 9092.
For example, run the following command:
java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka
... transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2022-06-08 15:43:42 INFO AppInfoParser:117 - Kafka version: xxx 2022-06-08 15:43:42 INFO AppInfoParser:118 - Kafka commitId: xxx 2022-06-08 15:43:42 INFO AppInfoParser:119 - Kafka startTimeMs: xxx 2022-06-08 15:43:42 INFO Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A
- Open a new client connection window and run the following commands to read data from the Kafka topic:
cd /opt/client/Spark2x/spark
source bigdata_env
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
- <checkPointDir> indicates the HDFS path for backing up application results, for example, /tmp.
- <brokers> indicates the Kafka address for obtaining metadata, in the format of IP address of the Broker instance:Kafka connection port.
- <topic> indicates the topic name read from Kafka.
- <batchTime> indicates the interval for Streaming processing in batches, for example, 5.
For example, run the following commands:
cd /opt/client/Spark2x/spark
source bigdata_env
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5
After the program is executed, you can view the data statistics in Kafka.
.... ------------------------------------------- Time: 1654674380000 ms ------------------------------------------- (ZhangSan,6) (Tom,6) (LinDa,6) (WangwWU,6) (LiSi,6) (Jemmmy,6) ------------------------------------------- Time: 1654674385000 ms ------------------------------------------- (ZhangSan,717) (Tom,717) (LinDa,717) (WangwWU,717) (LiSi,717) (Jemmmy,717) ------------------------------------------- Time: 1654674390000 ms ------------------------------------------- (ZhangSan,2326) (Tom,2326) (LinDa,2326) (WangwWU,2326) (LiSi,2326) (Jemmmy,2326) ...
- Log in to FusionInsight Manager and choose Cluster > Services > Spark2x.
- On the Dashboard tab page that is displayed, click the link next to Spark WebUI to access the History Server web UI.
Click a job ID to view the status of the Spark Streaming job.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.