Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ MapReduce Service/ Best Practices/ Data Analytics/ Consuming Kafka Data Using Spark Streaming Jobs

Consuming Kafka Data Using Spark Streaming Jobs

Updated on 2022-12-09 GMT+08:00

Application Scenarios

Use an MRS cluster to run Spark Streaming jobs to consume Kafka data.

Assume that Kafka receives one word record every second in a service. The Spark applications developed based on service requirements implements the function of accumulating the total number of records of each word in real time.

Spark Streaming sample projects store data in and send data to Kafka.

Solution Architecture

Spark is a distributed batch processing framework. It provides analysis and mining and iterative memory computing capabilities and supports application development in multiple programming languages, including Scala, Java, and Python. Spark applies to the following scenarios:

  • Data processing: Spark can process data quickly and has fault tolerance and scalability.
  • Iterative computation: Spark supports iterative computation to meet the requirements of multi-step data processing logic.
  • Data mining: Based on massive data, Spark can handle complex data mining and analysis and support multiple data mining and machine learning algorithms.
  • Streaming processing: Spark supports streaming processing with a seconds-level delay and supports multiple external data sources.
  • Query analysis: Spark supports standard SQL query analysis, provides the DSL (DataFrame), and supports multiple external inputs.

Spark Streaming is a real-time computing framework built on the Spark, which expands the capability for processing massive streaming data. Spark supports two data processing approaches: Direct Streaming and Receiver.

In Direct Streaming approach, Direct API is used to process data. Take Kafka Direct API as an example. Direct API provides offset location that each batch range will read from, which is much simpler than starting a receiver to continuously receive data from Kafka and written data to WALs. Then, each batch job is running and the corresponding offset data is ready in Kafka. These offset information can be securely stored in the checkpoint file and read by applications that failed to start.

Figure 1 Data transmission through Direct Kafka API

After the failure, Spark Streaming can read data from Kafka again and process the data segment. The processing result is the same no matter Spark Streaming fails or not, because the semantic is processed only once.

Direct API does not need to use the WAL and Receivers, and ensures that each Kafka record is received only once, which is more efficient. In this way, the Spark Streaming and Kafka can be well integrated, making streaming channels be featured with high fault-tolerance, high efficiency, and ease-of-use. Therefore, you are advised to use Direct Streaming to process data.

When a Spark Streaming application starts (that is, when the driver starts), the related StreamingContext (the basis of all streaming functions) uses SparkContext to start the receiver to become a long-term running task. Receiver receives and stores streaming data to the Spark memory for processing. Figure 2 shows the data transfer lifecycle.

Figure 2 Data transfer lifecycle
  1. Receive data (blue arrow).

    Receiver divides a data stream into a series of blocks and stores them in the executor memory. In addition, after WAL is enabled, it writes data to the WAL of the fault-tolerant file system.

  2. Notify the driver (green arrow).

    The metadata in the received block is sent to StreamingContext in the driver. The metadata includes:

    • Block reference ID used to locate the data position in the Executor memory.
    • Block data offset information in logs (if the WAL function is enabled).
  3. Process data (red arrow).

    For each batch of data, StreamingContext uses block information to generate resilient distributed datasets (RDDs) and jobs. StreamingContext executes jobs by running tasks to process blocks in the executor memory.

  4. Periodically set checkpoints (orange arrows).
  5. For fault tolerance, StreamingContext periodically sets checkpoints and saves them to external file systems.

Procedure

Huawei Cloud MRS provides sample development projects for Spark in multiple scenarios. The development guideline for the scenario in this practice is as follows:

  1. Receive data from Kafka and generate the corresponding DStream.
  2. Classify word records.
  3. Compute the result and print it.

Step 1: Creating an MRS Cluster

  1. Create and purchase an MRS cluster that contains the Spark2x and Kafka components. For details, see Buying a Custom Cluster.

    NOTE:

    In this practice, an MRS 3.1.0 cluster with Kerberos authentication disabled is used as an example.

  2. After the cluster is purchased, install the cluster client on any node of the cluster. For details, see Installing and Using the Cluster Client.

    Assume that the client is installed in /opt/client.

Step 2: Preparing Applications

  1. Obtain the sample project from Huawei Mirrors.

    Download the Maven project source code and configuration files of the sample project, and configure related development tools on the local host. For details, see Obtaining Sample Projects from Huawei Mirrors.

    Select a sample project based on the cluster version and download the sample project.

    For example, to obtain SparkStreamingKafka010JavaExample, visit https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/spark-examples/sparknormal-examples/SparkStreamingKafka010JavaExample.

  2. Use the IDEA tool to import the sample project and wait for the Maven project to download the dependency package. For details, see Configuring and Importing Sample Projects.

    In this example project, Streaming is used to call the Kafka API to obtain word records, and word records are classified to obtain the number of records of each word. The key code snippets are as follows:

    public class StreamingExampleProducer {
        public static void main(String[] args) throws IOException {
            if (args.length < 2) {
                printUsage();
            }
            String brokerList = args[0];
            String topic = args[1];
            String filePath = "/home/data/";    //Path for obtaining the source data
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
    
            for (int m = 0; m < Integer.MAX_VALUE / 2; m++) {
                File dir = new File(filePath);
                File[] files = dir.listFiles();
                if (files != null) {
                    for (File file : files) {
                        if (file.isDirectory()) {
                            System.out.println(file.getName() + "This is a directory!");
                        } else {
                            BufferedReader reader = null;
                            reader = new BufferedReader(new FileReader(filePath + file.getName()));
                            String tempString = null;
                            while ((tempString = reader.readLine()) != null) {
                                // Blank line judgment
                                if (!tempString.isEmpty()) {
                                    producer.send(new ProducerRecord<String, String>(topic, tempString));
                                }
                            }
                            // make sure the streams are closed finally.
                            reader.close();
                        }
                    }
                }
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private static void printUsage() {
            System.out.println("Usage: {brokerList} {topic}");
        }
    }

  3. After Maven and SDK parameters are configured on the local host, the sample project automatically loads related dependency packages. After the loading is complete, double-click package to obtain the JAR file.

    For example, the packaged JAR file is SparkStreamingKafka010JavaExample-1.0.jar.

Step 3: Uploading the JAR Package and Source Data

  1. Prepare the source data to be sent to Kafka, for example, the following input_data.txt file. Upload the file to the /home/data directory on the client node.

    ZhangSan
    LiSi
    WangwWU
    Tom
    Jemmmy
    LinDa

  2. Upload the compiled JAR package to a directory, for example, /opt, on the client node.

    NOTE:

    If you cannot directly connect to the client node to upload files through the local network, upload the JAR file or source data to OBS, import the file to HDFS on the Files tab page of the MRS cluster, and run the hdfs dfs -get command on the HDFS client to download the file to the client node.

Step 4: Running the Job and Viewing the Result

  1. Log in to the node where the cluster client is installed as user root.

    cd /opt/client

    source bigdata_env

  2. Create a Kafka topic for receiving data.

    kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port /kafka --replication-factor 2 --partitions 3 --topic Topic name

    To query the IP address of the quorumpeer instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > ZooKeeper, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can query the ZooKeeper client connection port by querying the ZooKeeper service configuration parameter clientPort. The default value is 2181.

    For example, run the following command:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka

    Created topic sparkkafka.

  3. After the topic is created, execute the program to send data to Kafka.

    java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer IP address of the Broker instance:Kafka connection port Topic name

    To query the IP address of the Kafka Broker instance, log in to FusionInsight Manager of the cluster, choose Cluster > Services > Kafka, and click the Instance tab. Use commas (,) to separate multiple IP addresses. You can query the Kafka connection port by querying the Kafka service configuration parameter port. The default value is 9092.

    For example, run the following command:

    java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka

    ...
    	transactional.id = null
    	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
    2022-06-08 15:43:42 INFO  AppInfoParser:117 - Kafka version: xxx
    2022-06-08 15:43:42 INFO  AppInfoParser:118 - Kafka commitId: xxx
    2022-06-08 15:43:42 INFO  AppInfoParser:119 - Kafka startTimeMs: xxx
    2022-06-08 15:43:42 INFO  Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A

  4. Open a new client connection window and run the following commands to read data from the Kafka topic:

    cd /opt/client/Spark2x/spark

    source bigdata_env

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

    • <checkPointDir> indicates the HDFS path for backing up application results, for example, /tmp.
    • <brokers> indicates the Kafka address for obtaining metadata, in the format of IP address of the Broker instance:Kafka connection port.
    • <topic> indicates the topic name read from Kafka.
    • <batchTime> indicates the interval for Streaming processing in batches, for example, 5.

    For example, run the following commands:

    cd /opt/client/Spark2x/spark

    source bigdata_env

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5

    After the program is executed, you can view the data statistics in Kafka.

    ....
    -------------------------------------------                                     
    Time: 1654674380000 ms
    -------------------------------------------
    (ZhangSan,6)
    (Tom,6)
    (LinDa,6)
    (WangwWU,6)
    (LiSi,6)
    (Jemmmy,6)
     
    -------------------------------------------                                     
    Time: 1654674385000 ms
    -------------------------------------------
    (ZhangSan,717)
    (Tom,717)
    (LinDa,717)
    (WangwWU,717)
    (LiSi,717)
    (Jemmmy,717)
     
    -------------------------------------------
    Time: 1654674390000 ms
    -------------------------------------------
    (ZhangSan,2326)
    (Tom,2326)
    (LinDa,2326)
    (WangwWU,2326)
    (LiSi,2326)
    (Jemmmy,2326)
     ...

  5. Log in to FusionInsight Manager and choose Cluster > Services > Spark2x.
  6. On the Dashboard tab page that is displayed, click the link next to Spark WebUI to access the History Server web UI.

    Click a job ID to view the status of the Spark Streaming job.

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback