Help Center> Data Lake Insight> Developer Guide> Using the Spark Job to Access DLI Metadata

Using the Spark Job to Access DLI Metadata

Scenario

DLI allows you to access OBS tables created by SQL jobs with Spark job requests. All and OBS non-partition tables and partition tables are supported. For details about how to create an OBS table, see the Data Lake Insight SQL Syntax Reference.

Description

  • You can create databases and tables in SQL jobs and read and insert data using SQL or Spark jobs.
  • You can create databases and tables in Spark jobs and read and insert data using SQL or Spark jobs.
  • You cannot create databases in SQL jobs, create tables in Spark jobs, or read and insert data through SQL or Spark jobs.

Procedure

  1. Compile a Spark program to access DLI metadata.
    1. Specify spark.sql.session.state.builder and spark.sql.catalog.class when creating SparkSession.
      sparkConf
           .set("spark.sql.session.state.builder",
              "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder")
           .set("spark.sql.catalog.class",    
              "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog")
    2. Specify org.apache.spark.sql.CarbonInternalExtensions when creating a Spark session.
      val spark =
          SparkSession.builder
            .config(sparkConf)
            .enableHiveSupport()
            .config("spark.sql.extensions",
          Seq("org.apache.spark.sql.CarbonInternalExtensions",
              "org.apache.spark.sql.DliSparkExtension").mkString(","))
            .appName("SparkTest")
            .getOrCreate()
    3. Run the SQL statement or perform other operations to access the DLI table.

      If the SQL statement is a DDL statement, it can be executed only after collect() is executed.

  2. Submit the Spark application to DLI.

    "catalog_name": "dli" needs to be added to the submitted parameter. For details, see the Data Lake Insight User Guide or Data Lake Insight API Reference. Add "spark.dli.metaAccess.enable":"true" to the conf file. Configure "spark.sql.warehouse.dir": "obs://bucket/warehousepath" in the conf file if you need to run the DDL.

    Example:

    {
        "queue":"citest",
        "file":"spark-dli-catalog-tester-1.0-SNAPSHOT.jar",
        "className":"DliCatalogTest",
        "args":[
             " select * from db_all_tb_type.tb_part_obs_hive_orc"
        ],
        "conf":{"spark.sql.warehouse.dir": "obs://bucket/warehousepath",
        "spark.dli.metaAccess.enable":"true"},
        "sc_type":"A",
        "executorCores":1,
        "numExecutors":6,
        "executorMemory":"4G",
        "driverCores":2,
        "driverMemory":"7G",
        "catalog_name": "dli"
    }

Example Code

  • Spark example code
    object DliCatalogTest {
      def main(args:Array[String]): Unit = {
        val sql = args(0)
        val runDdl =
    Try(args(1).toBoolean).getOrElse(true)
        System.out.println(s"sql is $sql
    runDdl is $runDdl")
        val sparkConf = new SparkConf(true)
        sparkConf    
          .set("spark.sql.session.state.builder",
            "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder")
          .set("spark.sql.catalog.class",      
            "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") 
        sparkConf.setAppName("dlicatalogtester")
    
        val spark = SparkSession.builder
          .config(sparkConf)
          .enableHiveSupport()
          .config("spark.sql.extensions",
    Seq("org.apache.spark.sql.CarbonInternalExtensions",
           
    "org.apache.spark.sql.DliSparkExtension").mkString(","))
          .appName("SparkTest")
          .getOrCreate()
    
        System.out.println("catalog is "
    + spark.sessionState.catalog.toString)
        if (runDdl) {
          val df = spark.sql(sql).collect()
        } else {
          spark.sql(sql).show()
        }
    
        spark.close()
      }
    
    }
  • Python example code
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    
    from __future__ import print_function
    
    import sys
    
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        url = sys.argv[1]
        creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \
                  "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \
                  " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url
    
        spark = SparkSession \
            .builder \
            .enableHiveSupport() \   .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder")
    \       
    .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog")
    \  
    .config("spark.sql.extensions",','.join(["org.apache.spark.sql.CarbonInternalExtensions","org.apache.spark.sql.DliSparkExtension"]))
    \
            .appName("python Spark test
    catalog") \
            .getOrCreate()
    
        spark.sql("CREATE database if not
    exists test_sparkapp").collect()
        spark.sql("drop table if exists
    test_sparkapp.dli_rds").collect()
        spark.sql(creatTbl).collect()
        spark.sql("select * from
    test_sparkapp.dli_rds").show()
        spark.sql("insert into table
    test_sparkapp.dli_rds select 12,'aaa'").collect()
        spark.sql("select * from
    test_sparkapp.dli_rds").show()
        spark.sql("insert overwrite table
    test_sparkapp.dli_rds select 1111,'asasasa'").collect()
        spark.sql("select * from
    test_sparkapp.dli_rds").show()
        spark.sql("drop table
    test_sparkapp.dli_rds").collect()
        spark.stop()