Using the Spark Job to Access DLI Metadata
Scenario
DLI allows you to develop a program to create Spark jobs for operations related to databases, DLI or OBS tables, and table data. This example demonstrates how to develop a job by writing a Java program, and use a Spark job to create a database and table and insert table data.
 
 
   This feature is currently in beta testing. If you want to use it, submit a service ticket to apply for permission to access DLI metadata using Spark jobs.
Constraints
- You must create a queue to use Spark 3.1 for metadata access.
- To use Spark 3.3.1 to access metadata, you need to create a custom agency credential and add agency information to the job configuration.
    For details about how to create a custom agency, see Creating a DLI Custom Agency. 
- The following cases are not supported:
    - If you create a database with a SQL job, you cannot write a program to create tables in that database.
      For example, the testdb database is created using the SQL editor of DLI. A program package for creating the testTable table in the testdb database does not work after it is submitted to a Spark Jar job. 
 
- If you create a database with a SQL job, you cannot write a program to create tables in that database.
      
- The following cases are supported:
    - You can create databases and tables in a SQL job, and read and insert data using SQL statements or a Spark program.
- You can create databases and tables in a Spark job, and read and insert data using SQL statements or a Spark program.
 
Environment Preparations
Before developing a Spark job to access DLI metadata, set up a development environment that meets the following requirements.
| Item | Description | 
|---|---|
| OS | Windows 7 or later | 
| JDK | JDK 1.8. | 
| IntelliJ IDEA | This tool is used for application development. The version of the tool must be 2019.1 or other compatible versions. | 
| Maven | Basic configurations of the development environment. Maven is used for project management throughout the lifecycle of software development. | 
Development Process
 
   | No. | Phase | Software Portal | Description | 
|---|---|---|---|
| 1 | Create a queue for general use. | DLI console | The DLI queue is created for running your job. | 
| 2 | Configure the OBS file. | OBS console | 
 | 
| 3 | Create a Maven project and configure the POM file. | IntelliJ IDEA | Write a program to create a DLI or OBS table by referring to the sample code. | 
| 4 | Write code. | ||
| 5 | Debug, compile, and pack the code into a Jar package. | ||
| 6 | Upload the Jar package to OBS and DLI. | OBS console DLI console | You can upload the generated Spark Jar package to an OBS directory and DLI program package. | 
| 7 | Create a Spark JAR job. | DLI console | The Spark Jar job is created and submitted on the DLI console. | 
| 8 | Check execution result of the job. | DLI console | You can check the job status and run logs. | 
Step 1: Create a Queue for General Purpose
Create a queue before submitting Spark jobs. In this example, we will create a general-purpose queue named sparktest.
- Log in to the DLI management console.
- In the navigation pane on the left, choose Resources > Resource Pool.
- On the displayed page, click Buy Resource Pool in the upper right corner.
- On the displayed page, set the parameters.
    In this example, we will buy the resource pool in the CN East-Shanghai2 region. Table 3 describes the parameters.Table 3 Parameter descriptions Parameter Description Example Value Region Select a region where you want to buy the elastic resource pool. CN East-Shanghai2 Project Project uniquely preset by the system for each region Default Name Name of the elastic resource pool dli_resource_pool Specifications Specifications of the elastic resource pool Standard CU Range The maximum and minimum CUs allowed for the elastic resource pool 64-64 CIDR Block CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed. 172.16.0.0/19 Enterprise Project Select an enterprise project for the elastic resource pool. default 
- Click Buy.
- Click Submit.
- In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
- Set the basic parameters listed below. 
    Table 4 Basic parameters for adding a queue Parameter Description Example Value Name Name of the queue to add dli_queue_01 Type Type of the queue - To execute SQL jobs, select For SQL.
- To execute Flink or Spark jobs, select For general purpose.
 For SQL jobs, select For SQL. For other scenarios, select For general purpose. Engine SQL queue engine. The options are Spark and HetuEngine. Spark Enterprise Project Select an enterprise project. default 
- Click Next and configure scaling policies for the queue.
    Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs. Figure 2 shows the scaling policy configured in this example.Table 5 Scaling policy parameters Parameter Description Example Value Priority Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default. 1 Period The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified. The period for the scaling policy is from 00 to 24. 00–24 Min CU Minimum number of CUs allowed by the scaling policy 16 Max CU Maximum number of CUs allowed by the scaling policy 64 
- Click OK.
Step 2: Configure the OBS Bucket File
- To create an OBS table, upload data to the OBS bucket directory.
- Log in to the OBS console. On the Buckets page, click the name of the OBS bucket you created. In this example, the bucket name is dli-test-obs01.
- On the Objects tab, click Upload Object. In the displayed dialog box, upload the testdata.csv file to the root directory of the OBS bucket.
- In the root directory of the OBS bucket, click Create Folder to create a folder and name it warehousepath. This folder is used to store DLI metadata in spark.sql.warehouse.dir.
Step 3: Create a Maven Project and Configure the POM Dependency
- Start IntelliJ IDEA and choose File > New > Project.
     Figure 3 Creating a project  
- Choose Maven, set Project SDK to 1.8, and click Next.
     Figure 4 Selecting an SDK  
- Set the project name, configure the storage path, and click Finish.
     Figure 5 Creating a project  In this example, the Maven project name is SparkJarMetadata, and the project storage path is D:\DLITest\SparkJarMetadata. 
- Add the following content to the pom.xml file.
     <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> </dependencies>Figure 6 Modifying the pom.xml file  
- Choose src > main and right-click the java folder. Choose New > Package to create a package and a class file.
     Figure 7 Creating a package  Set the package name as you need. In this example, set Package to com.huawei.dli.demo and press Enter. Create a Java Class file in the package path. In this example, the Java Class file is DliCatalogTest.Figure 8 Creating a Java class file  
Step 4: Write Code
Write the DliCatalogTest program to create a database, DLI table, and OBS table.
For the sample code, see Java Example Code.
- Import the dependency.
    import org.apache.spark.sql.SparkSession; 
- Create a SparkSession instance.
    When you create a SparkSession, you need to specify spark.sql.session.state.builder, spark.sql.catalog.class, and spark.sql.extensions parameters as configured in the following example. - Spark 2.3.x
      SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate();
- Spark 2.4.x
      SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .config("spark.sql.hive.implementation","org.apache.spark.sql.hive.client.DliHiveClientImpl") .appName("java_spark_demo") .getOrCreate();
- Spark 3.1.x
      SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate();
- Spark 3.3.x
      SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.DliLakeHouseBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.DliLakeHouseCatalog") .appName("java_spark_demo") .getOrCreate();
 
- Spark 2.3.x
      
- Create a database.
- Create a DLI table and insert test data.
    spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect();
- Create an OBS Table. Replace the OBS path in the following example with the path you set in Step 2: Configure the OBS Bucket File.
    spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect();
- Disable the spark session.
    spark.stop(); 
Step 5: Debug, Compile, and Pack the Code into a Jar Package.
- Double-click Maven in the tool bar on the right, and double-click clean and compile to compile the code.
    After the compilation is successful, double-click package.Figure 9 Compiling and packaging The generated JAR package is stored in the target directory. In this example, SparkJarMetadata-1.0-SNAPSHOT.jar is stored in D:\DLITest\SparkJarMetadata\target.Figure 10 Exporting the JAR file The generated JAR package is stored in the target directory. In this example, SparkJarMetadata-1.0-SNAPSHOT.jar is stored in D:\DLITest\SparkJarMetadata\target.Figure 10 Exporting the JAR file  
Step 6: Upload the JAR Package to OBS and DLI
- Spark 3.3 or later:
    You can only set the Application parameter when creating a Spark job and select the required JAR file from OBS. - Log in to the OBS console and upload the JAR file to the OBS path.
- Log in to the DLI console. In the navigation pane, choose Job Management > Spark Jobs.
- Locate the row containing a desired job and click Edit in the Operation column.
- Set Application to the OBS path in 1.
      Figure 11 Configuring the application  
 
- Versions earlier than Spark 3.3:
    Upload the JAR file to OBS and DLI. - Log in to the OBS console and upload the JAR file to the OBS path.
- Upload the file to DLI for package management.
      - Log in to the DLI management console and choose Data Management > Package Management.
- On the Package Management page, click Create in the upper right corner.
- In the Create Package dialog, set the following parameters:
        - Type: Select JAR.
- OBS Path: Specify the OBS path for storing the package.
- Set Group and Group Name as required for package identification and management.
 
- Click OK.
        Figure 12 Creating a package  
 
 
Step 7: Create a Spark Jar Job
- Log in to the DLI console. In the navigation pane, choose Job Management > Spark Jobs.
- On the Spark Jobs page, click Create Job in the upper right corner.
- On the job creation page, set the job parameters.
    Table 6 describes the parameters. Retain the default values for other parameters.Table 6 Spark Jar job parameters Parameter Value Queue Select the DLI queue created for general purpose. For example, select the queue sparktest created in Step 1: Create a Queue for General Purpose. Spark Version Select a Spark version. Select a supported Spark version from the drop-down list. The latest version is recommended. Job Name (--name) Name of a custom Spark Jar job. For example, SparkTestMeta. Application Select the package uploaded to DLI in Step 6: Upload the JAR Package to OBS and DLI. For example, select SparkJarMetadata-1.0-SNAPSHOT.jar. Main Class (--class) The format is program package name + class name. For example, com.huawei.dli.demo.DliCatalogTest. Spark Arguments (--conf) spark.dli.metaAccess.enable=true spark.sql.warehouse.dir=obs://dli-test-obs01/warehousepath NOTE:Set spark.sql.warehouse.dir to the OBS path that is specified in Step 2: Configure the OBS Bucket File. Access Metadata Select Yes. 
- Click Execute to submit the Spark Jar job. On the Spark Jobs page, check the status of the job you submitted.
    Figure 13 Checking the job execution status  
Step 8: Check Job Execution Result
- On the Spark Jobs page, check the status of the job you submitted. The initial status is Starting.
- If the job is successfully executed, the job status is Finished. Perform the following operations to check the created database and table:
    - On the DLI console, choose SQL Editor in the left navigation pane. The created database test_sparkapp is displayed in the database list.
      Figure 14 Checking the created database  
- Double-click the database name to check the created DLI and OBS tables in the database.
      Figure 15 Checking a table  
- Double-click dli_testtable and click Execute to query data in the table.
      Figure 16 Querying data in the table  
- Comment out the statement for querying the DLI table, double-click the OBS table dli_testobstable, and click Execute to query the OBS table data.
      Figure 17 Querying data in the OBS table  
 
- On the DLI console, choose SQL Editor in the left navigation pane. The created database test_sparkapp is displayed in the database list.
      
- If the job is successfully executed, the job status is Failed. Click More in the Operation column and select Driver Logs to check the run log.
    Figure 18 Checking logs  After the fault is rectified, click Edit in the Operation column of the job, modify job parameters, and click Execute to run the job again. 
Follow-up Guide
- If you want to use a Spark Jar job to access other data sources, see Using Spark Jobs to Access Data Sources of Datasource Connections.
- For details about the syntax for creating a DLI table, see Creating a DLI Table. For details about the syntax for creating an OBS table, see Creating an OBS Table.
- If you submit the job by calling an API, perform the following operations:
    Call the API for creating a batch processing job. The following table describes the request parameters. For details about API parameters, see Creating a Batch Processing Job. - Set catalog_name in the request to dli.
- Add "spark.dli.metaAccess.enable":"true" to the CONF file.
      Configure "spark.sql.warehouse.dir": "obs://bucket/warehousepath" in the CONF file if you need to run the DDL. The following example provided you with the complete API request. { "queue":"citest", "file":"SparkJarMetadata-1.0-SNAPSHOT.jar", "className":"DliCatalogTest", "conf":{"spark.sql.warehouse.dir": "obs://bucket/warehousepath", "spark.dli.metaAccess.enable":"true"}, "sc_type":"A", "executorCores":1, "numExecutors":6, "executorMemory":"4G", "driverCores":2, "driverMemory":"7G", "catalog_name": "dli" }
 
Java Example Code
This example uses Java for coding. The complete sample code is as follows:
package com.huawei.dli.demo;
import org.apache.spark.sql.SparkSession;
public class DliCatalogTest {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder")
                .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog")
                .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension")
                .appName("java_spark_demo")
                .getOrCreate();
        spark.sql("create database if not exists test_sparkapp").collect();
        spark.sql("drop table if exists test_sparkapp.dli_testtable").collect();
        spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect();
        spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect();
        spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect();
        spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect();
        spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect();
        spark.stop();
    }
}
 Scala Example Code
object DliCatalogTest {
  def main(args:Array[String]): Unit = {
    val sql = args(0)
    val runDdl =
Try(args(1).toBoolean).getOrElse(true)
    System.out.println(s"sql is $sql
runDdl is $runDdl")
    val sparkConf = new SparkConf(true)
    sparkConf    
      .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder")
      .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") 
    sparkConf.setAppName("dlicatalogtester")
    val spark = SparkSession.builder
      .config(sparkConf)
      .enableHiveSupport()
      .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension")
      .appName("SparkTest")
      .getOrCreate()
    System.out.println("catalog is "
+ spark.sessionState.catalog.toString)
    if (runDdl) {
      val df = spark.sql(sql).collect()
    } else {
      spark.sql(sql).show()
    }
    spark.close()
  }
}
 Example Python Code
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
    url = sys.argv[1]
    creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \
              "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \
              " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url
    spark = SparkSession \
        .builder \
        .enableHiveSupport() \
.config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \       
.config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \  
.config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \
        .appName("python Spark test catalog") \
        .getOrCreate()
    spark.sql("CREATE database if not exists test_sparkapp").collect()
    spark.sql("drop table if exists test_sparkapp.dli_rds").collect()
    spark.sql(creatTbl).collect()
    spark.sql("select * from test_sparkapp.dli_rds").show()
    spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect()
    spark.sql("select * from test_sparkapp.dli_rds").show()
    spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect()
    spark.sql("select * from test_sparkapp.dli_rds").show()
    spark.sql("drop table test_sparkapp.dli_rds").collect()
    spark.stop()
 Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
 
     
      