Help Center/ MapReduce Service/ Best Practices/ Data Analytics/ Consuming Kafka Data Using Spark Streaming Jobs
Updated on 2022-12-09 GMT+08:00

Consuming Kafka Data Using Spark Streaming Jobs

Application Scenarios

Use an MRS cluster to run Spark Streaming jobs to consume Kafka data.

Assume that Kafka receives one word record every second in a service. The Spark applications developed based on service requirements implements the function of accumulating the total number of records of each word in real time.

Spark Streaming sample projects store data in and send data to Kafka.

Solution Architecture

Spark is a distributed batch processing framework. It provides analysis and mining and iterative memory computing capabilities and supports application development in multiple programming languages, including Scala, Java, and Python. Spark applies to the following scenarios:

  • Data processing: Spark can process data quickly and has fault tolerance and scalability.
  • Iterative computation: Spark supports iterative computation to meet the requirements of multi-step data processing logic.
  • Data mining: Based on massive data, Spark can handle complex data mining and analysis and support multiple data mining and machine learning algorithms.
  • Streaming processing: Spark supports streaming processing with a seconds-level delay and supports multiple external data sources.
  • Query analysis: Spark supports standard SQL query analysis, provides the DSL (DataFrame), and supports multiple external inputs.

Spark Streaming is a real-time computing framework built on the Spark, which expands the capability for processing massive streaming data. Spark supports two data processing approaches: Direct Streaming and Receiver.

In Direct Streaming approach, Direct API is used to process data. Take Kafka Direct API as an example. Direct API provides offset location that each batch range will read from, which is much simpler than starting a receiver to continuously receive data from Kafka and written data to WALs. Then, each batch job is running and the corresponding offset data is ready in Kafka. These offset information can be securely stored in the checkpoint file and read by applications that failed to start.

Figure 1 Data transmission through Direct Kafka API

After the failure, Spark Streaming can read data from Kafka again and process the data segment. The processing result is the same no matter Spark Streaming fails or not, because the semantic is processed only once.

Direct API does not need to use the WAL and Receivers, and ensures that each Kafka record is received only once, which is more efficient. In this way, the Spark Streaming and Kafka can be well integrated, making streaming channels be featured with high fault-tolerance, high efficiency, and ease-of-use. Therefore, you are advised to use Direct Streaming to process data.

When a Spark Streaming application starts (that is, when the driver starts), the related StreamingContext (the basis of all streaming functions) uses SparkContext to start the receiver to become a long-term running task. Receiver receives and stores streaming data to the Spark memory for processing. Figure 2 shows the data transfer lifecycle.

Figure 2 Data transfer lifecycle
  1. Receive data (blue arrow).

    Receiver divides a data stream into a series of blocks and stores them in the executor memory. In addition, after WAL is enabled, it writes data to the WAL of the fault-tolerant file system.

  2. Notify the driver (green arrow).

    The metadata in the received block is sent to StreamingContext in the driver. The metadata includes:

    • Block reference ID used to locate the data position in the Executor memory.
    • Block data offset information in logs (if the WAL function is enabled).
  3. Process data (red arrow).

    For each batch of data, StreamingContext uses block information to generate resilient distributed datasets (RDDs) and jobs. StreamingContext executes jobs by running tasks to process blocks in the executor memory.

  4. Periodically set checkpoints (orange arrows).
  5. For fault tolerance, StreamingContext periodically sets checkpoints and saves them to external file systems.

Procedure

Huawei Cloud MRS provides sample development projects for Spark in multiple scenarios. The development guideline for the scenario in this practice is as follows:

  1. Receive data from Kafka and generate the corresponding DStream.
  2. Classify word records.
  3. Compute the result and print it.

Step 1: Creating an MRS Cluster

  1. Create and purchase an MRS cluster that contains the Spark2x and Kafka components. For details, see Buying a Custom Cluster.

    In this practice, an MRS 3.1.0 cluster with Kerberos authentication disabled is used as an example.

  2. After the cluster is purchased, install the cluster client on any node of the cluster. For details, see Installing and Using the Cluster Client.

    Assume that the client is installed in /opt/client.

Step 2: Preparing Applications

  1. Obtain the sample project from Huawei Mirrors.

    Download the Maven project source code and configuration files of the sample project, and configure related development tools on the local host. For details, see Obtaining Sample Projects from Huawei Mirrors.

    Select a sample project based on the cluster version and download the sample project.

    For example, to obtain SparkStreamingKafka010JavaExample, visit https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/spark-examples/sparknormal-examples/SparkStreamingKafka010JavaExample.

  2. Use the IDEA tool to import the sample project and wait for the Maven project to download the dependency package. For details, see Configuring and Importing Sample Projects.

    In this example project, Streaming is used to call the Kafka API to obtain word records, and word records are classified to obtain the number of records of each word. The key code snippets are as follows:

    public class StreamingExampleProducer {
        public static void main(String[] args) throws IOException {
            if (args.length < 2) {
                printUsage();
            }
            String brokerList = args[0];
            String topic = args[1];
            String filePath = "/home/data/";    //Path for obtaining the source data
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
    
            for (int m = 0; m < Integer.MAX_VALUE / 2; m++) {
                File dir = new File(filePath);
                File[] files = dir.listFiles();
                if (files != null) {
                    for (File file : files) {
                        if (file.isDirectory()) {
                            System.out.println(file.getName() + "This is a directory!");
                        } else {
                            BufferedReader reader = null;
                            reader = new BufferedReader(new FileReader(filePath + file.getName()));
                            String tempString = null;
                            while ((tempString = reader.readLine()) != null) {
                                // Blank line judgment
                                if (!tempString.isEmpty()) {
                                    producer.send(new ProducerRecord<String, String>(topic, tempString));
                                }
                            }
                            // make sure the streams are closed finally.
                            reader.close();
                        }
                    }
                }
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private static void printUsage() {
            System.out.println("Usage: {brokerList} {topic}");
        }
    }

  3. After Maven and SDK parameters are configured on the local host, the sample project automatically loads related dependency packages. After the loading is complete, double-click package to obtain the JAR file.

    For example, the packaged JAR file is SparkStreamingKafka010JavaExample-1.0.jar.

Step 3: Uploading the JAR Package and Source Data

  1. Prepare the source data to be sent to Kafka, for example, the following input_data.txt file. Upload the file to the /home/data directory on the client node.

    ZhangSan
    LiSi
    WangwWU
    Tom
    Jemmmy
    LinDa

  2. Upload the compiled JAR package to a directory, for example, /opt, on the client node.

    If you cannot directly connect to the client node to upload files through the local network, upload the JAR file or source data to OBS, import the file to HDFS on the Files tab page of the MRS cluster, and run the hdfs dfs -get command on the HDFS client to download the file to the client node.

Step 4: Running the Job and Viewing the Result

  1. Log in to the node where the cluster client is installed as user root.

    cd /opt/client

    source bigdata_env

  2. Create a Kafka topic for receiving data.

    kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port /kafka --replication-factor 2 --partitions 3 --topic Topic name

    To query the IP address of the quorumpeer instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > ZooKeeper, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can query the ZooKeeper client connection port by querying the ZooKeeper service configuration parameter clientPort. The default value is 2181.

    For example, run the following command:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka

    Created topic sparkkafka.

  3. After the topic is created, execute the program to send data to Kafka.

    java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer IP address of the Broker instance:Kafka connection port Topic name

    To query the IP address of the Kafka Broker instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > Kafka, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can query the Kafka connection port by querying the Kafka service configuration parameter port. The default value is 9092.

    For example, run the following command:

    java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka

    ...
    	transactional.id = null
    	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
    2022-06-08 15:43:42 INFO  AppInfoParser:117 - Kafka version: xxx
    2022-06-08 15:43:42 INFO  AppInfoParser:118 - Kafka commitId: xxx
    2022-06-08 15:43:42 INFO  AppInfoParser:119 - Kafka startTimeMs: xxx
    2022-06-08 15:43:42 INFO  Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A

  4. Open a new client connection window and run the following commands to read data from the Kafka topic:

    cd /opt/client/Spark2x/spark

    source bigdata_env

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

    • <checkPointDir> indicates the HDFS path for backing up application results, for example, /tmp.
    • <brokers> indicates the Kafka address for obtaining metadata, in the format of IP address of the Broker instance:Kafka connection port.
    • <topic> indicates the topic name read from Kafka.
    • <batchTime> indicates the interval for Streaming processing in batches, for example, 5.

    For example, run the following commands:

    cd /opt/client/Spark2x/spark

    source bigdata_env

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5

    After the program is executed, you can view the data statistics in Kafka.

    ....
    -------------------------------------------                                     
    Time: 1654674380000 ms
    -------------------------------------------
    (ZhangSan,6)
    (Tom,6)
    (LinDa,6)
    (WangwWU,6)
    (LiSi,6)
    (Jemmmy,6)
     
    -------------------------------------------                                     
    Time: 1654674385000 ms
    -------------------------------------------
    (ZhangSan,717)
    (Tom,717)
    (LinDa,717)
    (WangwWU,717)
    (LiSi,717)
    (Jemmmy,717)
     
    -------------------------------------------
    Time: 1654674390000 ms
    -------------------------------------------
    (ZhangSan,2326)
    (Tom,2326)
    (LinDa,2326)
    (WangwWU,2326)
    (LiSi,2326)
    (Jemmmy,2326)
     ...

  5. Log in to FusionInsight Manager and choose Cluster > Services > Spark2x.
  6. On the Dashboard tab page that is displayed, click the link next to Spark WebUI to access the History Server web UI.

    Click a job ID to view the status of the Spark Streaming job.