Halaman ini belum tersedia dalam bahasa lokal Anda. Kami berusaha keras untuk menambahkan lebih banyak versi bahasa. Terima kasih atas dukungan Anda.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ Data Lake Insight/ Developer Guide/ Spark Jar Jobs/ Using Spark Jar Jobs to Read and Query OBS Data

Using Spark Jar Jobs to Read and Query OBS Data

Updated on 2025-01-10 GMT+08:00

Scenario

DLI is fully compatible with open-source Apache Spark and allows you to import, query, analyze, and process job data by programming. This section describes how to write a Spark program to read and query OBS data, compile and package the code, and submit it to a Spark Jar job.

Environment Preparations

Before you start, set up the development environment.

Table 1 Spark Jar job development environment

Item

Description

OS

Windows 7 or later

JDK

JDK 1.8.

IntelliJ IDEA

This tool is used for application development. The version of the tool must be 2019.1 or other compatible versions.

Maven

Basic configurations of the development environment. Maven is used for project management throughout the lifecycle of software development.

Development Process

The following figure shows the process of developing a Spark Jar job.
Figure 1 Development process
Table 2 Process description

No.

Phase

Software Portal

Description

1

Create a queue for general use.

DLI console

The DLI queue is created for running your job.

2

Upload data to an OBS bucket.

OBS console

The test data needs to be uploaded to your OBS bucket.

3

Create a Maven project and configure the POM file.

IntelliJ IDEA

Write your code by referring to the sample code for reading data from OBS.

4

Write code.

5

Debug, compile, and pack the code into a Jar package.

6

Upload the Jar package to OBS and DLI.

OBS console

DLI console

You can upload the generated Spark JAR package to an OBS directory and DLI program package.

7

Create a Spark Jar Job.

DLI console

The Spark Jar job is created and submitted on the DLI console.

8

Check the job execution result.

DLI console

You can check the job status and run logs.

Step 1: Create a Queue for General Purpose

Create a queue before submitting Spark jobs. In this example, we will create a general-purpose queue named sparktest.

  1. Log in to the DLI management console.
  2. In the navigation pane on the left, choose Resources > Resource Pool.
  3. On the displayed page, click Buy Resource Pool in the upper right corner.
  4. On the displayed page, set the parameters.
    In this example, we will buy the resource pool in the CN East-Shanghai2 region. Table 3 describes the parameters.
    Table 3 Parameters

    Parameter

    Description

    Example Value

    Region

    Select a region where you want to buy the elastic resource pool.

    CN East-Shanghai2

    Project

    Project uniquely preset by the system for each region

    Default

    Name

    Name of the elastic resource pool

    dli_resource_pool

    Specifications

    Specifications of the elastic resource pool

    Standard

    CU Range

    The maximum and minimum CUs allowed for the elastic resource pool

    64-64

    CIDR Block

    CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.

    172.16.0.0/19

    Enterprise Project

    Select an enterprise project for the elastic resource pool.

    default

  5. Click Buy.
  6. Click Submit.
  7. In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
  8. Set the basic parameters listed below.
    Table 4 Basic parameters for adding a queue

    Parameter

    Description

    Example Value

    Name

    Name of the queue to add

    dli_queue_01

    Type

    Type of the queue

    • To execute SQL jobs, select For SQL.
    • To execute Flink or Spark jobs, select For general purpose.

    _

    Engine

    SQL queue engine. The options are Spark and HetuEngine.

    _

    Enterprise Project

    Select an enterprise project.

    default

  9. Click Next and configure scaling policies for the queue.

    Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.

    Figure 2 shows the scaling policy configured in this example.
    Figure 2 Configuring a scaling policy when adding a queue
    Table 5 Scaling policy parameters

    Parameter

    Description

    Example Value

    Priority

    Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.

    1

    Period

    The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.

    The period for the scaling policy is from 00 to 24.

    00–24

    Min CU

    Minimum number of CUs allowed by the scaling policy

    16

    Max CU

    Maximum number of CUs allowed by the scaling policy

    64

  10. Click OK.

Step 2: Upload Data to OBS

  1. Create the people.json file containing the following content:
    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
  2. Log in to the OBS console. On the Buckets page, click the name of the OBS bucket you created. In this example, the bucket name is dli-test-obs01.
  3. On the Objects tab, click Upload Object. In the displayed dialog box, upload the people.json file to the root directory of the OBS bucket.
  4. In the root directory of the OBS bucket, click Create Folder to create a folder and name it result.
  5. Click the result folder, click Create Folder on the displayed page to create a folder and name it parquet.

Step 3: Create a Maven Project and Configure the pom Dependency

This step uses IntelliJ IDEA 2020.2 as an example.
  1. Start IntelliJ IDEA and choose File > New > Project.
    Figure 3 Creating a project
  2. Choose Maven, set Project SDK to 1.8, and click Next.
    Figure 4 Creating a project
  3. Set the project name, configure the storage path, and click Finish.
    Figure 5 Creating a project

    In this example, the Maven project name is SparkJarObs, and the project storage path is D:\DLITest\SparkJarObs.

  4. Add the following content to the pom.xml file.
    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.2</version>
            </dependency>
    </dependencies>
    Figure 6 Modifying the pom.xml file
  5. Choose src > main and right-click the java folder. Choose New > Package to create a package and a class file.
    Figure 7 Creating a package

    Set the package name as you need. In this example, set Package to com.huawei.dli.demo. Then, press Enter.

    Create a Java Class file in the package path. In this example, the Java Class file is SparkDemoObs.
    Figure 8 Creating a Java class file

Step 4: Write Code

Code the SparkDemoObs program to read the people.json file from the OBS bucket, create the temporary table people, and query data.

For the sample code, see Sample Code.

  1. Import dependencies.
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    import static org.apache.spark.sql.functions.col;
  2. Create Spark session spark using the AK and SK of the current account.
    SparkSession spark = SparkSession
                    .builder()
                    .config("spark.hadoop.fs.obs.access.key", "xxx")
                    .config("spark.hadoop.fs.obs.secret.key", "yyy")
                    .appName("java_spark_demo")
                    .getOrCreate();
    • Replace xxx of "spark.hadoop.fs.obs.access.key" with the AK of the account.
    • Replace yyy of "spark.hadoop.fs.obs.secret.key" with the SK of the account.

    For details about how to obtain the AK and SK, see How Do I Obtain the AK/SK Pair?

  3. Read the people.json file from the OBS bucket.
    dli-test-obs01 is the name of the sample OBS bucket. Replace it with the actual OBS bucket name.
    Dataset<Row> df = spark.read().json("obs://dli-test-obs01/people.json");
    df.printSchema();
  4. Create temporary table people to read data.
    df.createOrReplaceTempView("people");
  5. Query data in the people table.
    Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
    sqlDF.show();
  6. Export people table data in Parquet format to the result/parquet directory of the OBS bucket.
    sqlDF.write().mode(SaveMode.Overwrite).parquet("obs://dli-test-obs01/result/parquet");
    spark.read().parquet("obs://dli-test-obs01/result/parquet").show();
  7. Disable the spark session.
    spark.stop();

Step 5: Debug, compile, and pack the code into a JAR package.

  1. Double-click Maven in the tool bar on the right, and double-click clean and compile to compile the code.
    After the compilation is successful, double-click package.
    Figure 9 Compiling and packaging
    The generated JAR package is stored in the target directory. In this example, SparkJarObs-1.0-SNAPSHOT.jar is stored in D:\DLITest\SparkJarObs\target.
    Figure 10 Exporting the JAR file

Step 6: Upload the JAR Package to OBS and DLI

  • Spark 3.3 or later:

    You can only set the Application parameter when creating a Spark job and select the required JAR file from OBS.

    1. Log in to the OBS console and upload the JAR file to the OBS path.
    2. Log in to the DLI console. In the navigation pane, choose Job Management > Spark Jobs.
    3. Locate the row containing a desired job and click Edit in the Operation column.
    4. Set Application to the OBS path in 1.
      Figure 11 Configuring the application
  • Versions earlier than Spark 3.3:

    Upload the JAR file to OBS and DLI.

    1. Log in to the OBS console and upload the JAR file to the OBS path.
    2. Upload the file to DLI for package management.
      1. Log in to the DLI management console and choose Data Management > Package Management.
      2. On the Package Management page, click Create in the upper right corner.
      3. In the Create Package dialog, set the following parameters:
        1. Type: Select JAR.
        2. OBS Path: Specify the OBS path for storing the package.
        3. Set Group and Group Name as required for package identification and management.
      4. Click OK.
        Figure 12 Creating a package

Step 7: Create a Spark Jar Job

  1. Log in to the DLI console. In the navigation pane, choose Job Management > Spark Jobs.
  2. On the Spark Jobs page, click Create Job.
  3. On the displayed page, configure the following parameters:
    • Queue: Select the created queue. For example, select the queue sparktest created in Step 1: Create a Queue for General Purpose.
    • Select a supported Spark version from the drop-down list. The latest version is recommended.
    • Job Name (--name): Name of the Spark Jar job. For example, SparkTestObs.
    • Application: Select the package uploaded in Step 6: Upload the JAR Package to OBS and DLI. For example, select SparkJarObs-1.0-SNAPSHOT.jar.
    • Main Class (--class): The format is program package name + class name. For example, com.huawei.dli.demo.SparkDemoObs.

    You do not need to set other parameters.

    For more information about Spark JAR job submission, see Creating a Spark Job.

  4. Click Execute to submit the Spark Jar job. On the Spark Jobs page, check the status of the job you submitted.
    Figure 13 Job status

Step 8: Check Job Execution Result

  1. On the Spark Jobs page, check the status of the job you submitted. The initial status is Starting.
  2. If the job is successfully executed, the job status is Finished. Click More in the Operation column and select Driver Logs to check the run log.
    Figure 14 Selecting Diver Logs
    Figure 15 Driver logs
  3. If the job is successfully executed, go to the result/parquet directory in the OBS bucket to check the generated parquet file.
  4. If the job fails to be executed, click More in the Operation column and select Driver Logs to check detailed error information.
    For example, the following figure shows that when you create the Spark Jar job, you did not add the package path to the main class name.
    Figure 16 Error information

    In the Operation column, click Edit, change the value of Main Class to com.huawei.dli.demo.SparkDemoObs, and click Execute to run the job again.

Follow-up Guide

Sample Code

NOTE:

Hard-coded or plaintext access.key and secret.key pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed.

package com.huawei.dli.demo;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.col;

public class SparkDemoObs {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .config("spark.hadoop.fs.obs.access.key", "xxx")
                .config("spark.hadoop.fs.obs.secret.key", "yyy")
                .appName("java_spark_demo")
                .getOrCreate();
        // can also be used --conf to set the ak sk when submit the app

        // test json data:
        // {"name":"Michael"}
        // {"name":"Andy", "age":30}
        // {"name":"Justin", "age":19}
        Dataset<Row> df = spark.read().json("obs://dli-test-obs01/people.json");
        df.printSchema();
        // root
        // |-- age: long (nullable = true)
        // |-- name: string (nullable = true)

        // Displays the content of the DataFrame to stdout
        df.show();
        // +----+-------+
        // | age|   name|
        // +----+-------+
        // |null|Michael|
        // |  30|   Andy|
        // |  19| Justin|
        // +----+-------+

        // Select only the "name" column
        df.select("name").show();
        // +-------+
        // |   name|
        // +-------+
        // |Michael|
        // |   Andy|
        // | Justin|
        // +-------+

        // Select people older than 21
        df.filter(col("age").gt(21)).show();
        // +---+----+
        // |age|name|
        // +---+----+
        // | 30|Andy|
        // +---+----+

        // Count people by age
        df.groupBy("age").count().show();
        // +----+-----+
        // | age|count|
        // +----+-----+
        // |  19|    1|
        // |null|    1|
        // |  30|    1|
        // +----+-----+

        // Register the DataFrame as a SQL temporary view
        df.createOrReplaceTempView("people");

        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
        sqlDF.show();
        // +----+-------+
        // | age|   name|
        // +----+-------+
        // |null|Michael|
        // |  30|   Andy|
        // |  19| Justin|
        // +----+-------+

        sqlDF.write().mode(SaveMode.Overwrite).parquet("obs://dli-test-obs01/result/parquet");
        spark.read().parquet("obs://dli-test-obs01/result/parquet").show();

        spark.stop();
    }
}

Kami menggunakan cookie untuk meningkatkan kualitas situs kami dan pengalaman Anda. Dengan melanjutkan penelusuran di situs kami berarti Anda menerima kebijakan cookie kami. Cari tahu selengkapnya

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback