Using the Spark Job to Access DLI Metadata
Scenario
DLI allows you to access OBS tables created by SQL jobs with Spark job requests. All and OBS non-partition tables and partition tables are supported. For details about how to create an OBS table, see the Data Lake Insight SQL Syntax Reference.
Description
- You can create databases and tables in SQL jobs and read and insert data using SQL or Spark jobs.
- You can create databases and tables in Spark jobs and read and insert data using SQL or Spark jobs.
- You cannot create databases in SQL jobs, create tables in Spark jobs, or read and insert data through SQL or Spark jobs.
Procedure
- Compile a Spark program to access DLI metadata.
- Specify spark.sql.session.state.builder and spark.sql.catalog.class when creating SparkSession.
sparkConf .set("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") - Specify org.apache.spark.sql.CarbonInternalExtensions when creating a Spark session.
val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions", Seq("org.apache.spark.sql.CarbonInternalExtensions", "org.apache.spark.sql.DliSparkExtension").mkString(",")) .appName("SparkTest") .getOrCreate() - Run the SQL statement or perform other operations to access the DLI table.
If the SQL statement is a DDL statement, it can be executed only after collect() is executed.
- Specify spark.sql.session.state.builder and spark.sql.catalog.class when creating SparkSession.
- Submit the Spark application to DLI.
"catalog_name": "dli" needs to be added to the submitted parameter. For details, see the Data Lake Insight User Guide or Data Lake Insight API Reference. Add "spark.dli.metaAccess.enable":"true" to the conf file. Configure "spark.sql.warehouse.dir": "obs://bucket/warehousepath" in the conf file if you need to run the DDL.
Example:
{ "queue":"citest", "file":"spark-dli-catalog-tester-1.0-SNAPSHOT.jar", "className":"DliCatalogTest", "args":[ " select * from db_all_tb_type.tb_part_obs_hive_orc" ], "conf":{"spark.sql.warehouse.dir": "obs://bucket/warehousepath", "spark.dli.metaAccess.enable":"true"}, "sc_type":"A", "executorCores":1, "numExecutors":6, "executorMemory":"4G", "driverCores":2, "driverMemory":"7G", "catalog_name": "dli" }
Example Code
- Spark example code
object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions", Seq("org.apache.spark.sql.CarbonInternalExtensions", "org.apache.spark.sql.DliSparkExtension").mkString(",")) .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } } - Python example code
#!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions",','.join(["org.apache.spark.sql.CarbonInternalExtensions","org.apache.spark.sql.DliSparkExtension"])) \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
Last Article: Accessing a DLI Table Using a Spark Job
Next Article: TPC-H Usage Guide
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.