Accessing a DLI Table Using a Spark Job
Scenario
DLI allows you to access DLI tables created by SQL jobs with Spark job requests. All DLI non-partition tables and DLI partition tables created using the Datasource syntax are supported. DLI partition tables created using Hive statements are not supported. For details about table creation statements, see the Data Lake Insight SQL Syntax Reference.
Procedure
- Grant the permission to access DLI tables
You need to authorize the Spark permission to access the DLI table or database where the DLI table is located. Currently, only SQL statements can be used for authorization. You can check whether the user has the permission on the Databases and Tables page of the management console.
- Statements for granting permissions:
1 2
grant DLI_SPARK_APP_READ on databases.${dbName} TO USER ${userName} grant DLI_SPARK_APP_READ on databases.${dbName}.tables.${tableName} TO USER ${userName}
- Statements for revoking permissions:
1 2
revoke DLI_SPARK_APP_READ on databases.${dbName} FROM USER ${userName} revoke DLI_SPARK_APP_READ on databases.${dbName}.tables.${tableName} FROM USER ${userName}
- After the permission is granted or revoked, the permission takes effect after a period of time (not more than 15 minutes) due to token cache reasons.
- After a database is authorized, all DLI tables in the database can be accessed using Spark. Other tables cannot.
- Data write permission in DLI tables is not included.
- Before authorizing a sub-user to use this function, ensure that the sub-user have the Agent Operator permission. Otherwise, the sub-user cannot use this function.
- When you grant permissions to a sub-user for the first time, you need to create a custom policy for the tenant to which the sub-user belongs. Therefore, the account that submits the authorization request must be an authenticated token with the Security Administrator permission, for example, a tenant account.
- Statements for granting permissions:
- Compile a Spark program to access the DLI table.
- Specify the database and table names using parameters.
1 2 3
val map = new mutable.HashMap[String, String](); map("dli.table.databaseName") = dbName map("dli.table.tableName") = tableName
- Set format to dli_table to access the specified DLI table.
- To access a CarbonData table, you need to add a configuration item.
1 2
sparkConf.setAll(Map("spark.sql.extensions" -> Seq("org.apache.spark.sql.CarbonInternalExtensions", "org.apache.spark.sql.DliSparkExtension").mkString(",")))
- Specify the database and table names using parameters.
Troubleshooting
- Problem 1
- Problem description: When granting the permission to access DLI tables, the following error message is displayed: "Execute Job failed. Add user: ** to group: ** failed. status code: 400, body: {"error": {"message": "group number of user [***] over 10.", "code": 400, "error_code": null, "error_msg": null, "title": "Bad Request"}}".
- Cause: The number of user groups to which the grantee belongs exceeds the upper limit 10.
- Solution: Check the user group to which the grantee belongs, delete some users, or check whether the IAM quota can be increased.
- Problem 2:
- Problem description: When a Spark task is executed for query, the following error message is displayed: "reason: User class threw exception: java.lang.Exception: Get table meta failed. User do not have DLI_SPARK_APP_READ privilege on db.tableName.".
- Cause: The user who submits the job does not have the permission to access the table to be accessed. Note: Even a resource tenant or a table owner can access the table only after being authorized.
- Solution: Perform authorization by referring to 1.
Complete Scala Example Code
val dbName = "test_db"
val tableName = "test_table"
var isCarbonData = "false"
val sparkConf = new SparkConf()
if (isCarbonData.equals("true")) {
sparkConf.setAll(Map("spark.sql.extensions" -> Seq("org.apache.spark.sql.CarbonInternalExtensions",
"org.apache.spark.sql.DliSparkExtension").mkString(",")))
}
val sparkSession = SparkSession.builder().appName("ReadDLITableTest").config(sparkConf).getOrCreate()
val map = new mutable.HashMap[String, String]()
map("dli.table.databaseName") = dbName
map("dli.table.tableName") = tableName
val dataFrame = sparkSession.read.format("dli_table").options(map.toMap).load()
dataFrame.collect() When executing the sample code, you need to select a system module (named sys.datasource.dli-inner-table) as the dependency in Advanced Settings > Select Dependency Resources > Module Name on the Spark job creation page. For details about how to submit a Spark job, see the Data Lake Insight User Guide.
Complete Python Example Code
1 2 3 4 5 6 7 8 9 10 11 12 | # _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
if __name__ == "__main__":
conf = SparkConf().set('spark.sql.extensions', "org.apache.spark.sql.CarbonInternalExtensions,org.apache.spark.sql.DliSparkExtension")
sparkSession = SparkSession.builder.appName("SparkVisitDliCarbonTest").config(conf=conf).getOrCreate()
dataFrame = sparkSession.read.format("dli_table").option('dli.table.databaseName', 'test_db').
option('dli.table.tableName','test_table').load().show()
sparkSession.close()
|
When executing the sample code, you need to select a system module (named sys.datasource.dli-inner-table) as the dependency in Advanced Settings > Select Dependency Resources > Module Name on the Spark job creation page. For details about how to submit a Spark job, see the Data Lake Insight User Guide.
Last Article: Java Example Code
Next Article: Using the Spark Job to Access DLI Metadata
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.