- What's New
- Function Overview
- Product Bulletin
- Service Overview
-
Billing
- Billing Overview
- Billing for Compute Resources
- Billing for Storage Resources
- Billing for Scanned Data
- Yearly/Monthly Billing
- Billing Examples
- Renewing Subscriptions
- Bills
- Arrears
- Billing Termination
-
Billing FAQ
- What Billing Modes Does DLI Offer?
- Would a Pay-Per-Use DLI Queue Not Be Billed if No Job Is Submitted for Execution?
- When Is a Data Lake Queue Idle?
- How Do I Troubleshoot DLI Billing Issues?
- Why Am I Still Being Billed on a Pay-per-Use Basis After I Purchased a Yearly/Monthly Package?
- How Do I View the Usage of a Package?
- How Do I Purchase a 30-CU Queue?
- How Will I Be Billed If I Purchased a Pay-per-Use 16-CU DLI Queue but Only Used 2 CUs to Run Jobs?
- How Do I View a Job's Scanned Data Volume?
- Would a Pay-Per-Use Elastic Resource Pool Not Be Billed if No Job Is Submitted for Execution?
- Do I Need to Pay Extra Fees for Purchasing a Queue Billed Based on the Scanned Data Volume?
- How Is the Usage Beyond the Package Limit Billed?
- What Are the Actual CUs, CU Range, and Specifications of an Elastic Resource Pool?
- Change History
- Getting Started
-
User Guide
- DLI Console Overview
-
Creating an Elastic Resource Pool and Queues Within It
- Overview of DLI Elastic Resource Pools and Queues
- Creating an Elastic Resource Pool
- Creating Queues in an Elastic Resource Pool
- Creating a Standard Queue (Discarded and Not Recommended)
- Managing Elastic Resource Pools
-
Managing Queues
- Queue Permission Management
- Adjusting Scaling Policies for Queues in an Elastic Resource Pool
- Allocating a Queue to an Enterprise Project
- Creating an SMN Topic
- Managing Queue Tags
- Setting Queue Properties
- Testing Address Connectivity
- Modifying Queue Specifications
- Deleting a Queue
- Elastic Scaling of Standard Queues (Discarded and Not Recommended)
- Setting a Scheduled Elastic Scaling Task for a Standard Queue (Discarded and Not Recommended)
- Changing the CIDR Block of a Standard Queue (Discarded and Not Recommended)
- Example Use Case: Creating an Elastic Resource Pool and Running Jobs
- Example Use Case: Configuring Scaling Policies for Queues in an Elastic Resource Pool
- Job Management
- Data Management
-
Enhanced Datasource Connections
- Overview
- Cross-Source Analysis Development Methods
- Creating an Enhanced Datasource Connection
- Deleting an Enhanced Datasource Connection
- Modifying Host Information in an Elastic Resource Pool
- Binding and Unbinding a Queue
- Adding a Route
- Deleting a Route
- Enhanced Connection Permission Management
- Enhanced Datasource Connection Tag Management
- Datasource Authentication
- Job Templates
- DLI Agency Permissions
- Creating a DLI Global Variable
- Configuring a DLI Job Bucket
- SQL Inspector
- Creating a Custom Image
- Permissions Management
- Other Common Operations
- Permission Management for Global Variables
- Best Practices
-
Developer Guide
- SQL Jobs
-
Flink OpenSource SQL Jobs
- Reading Data from Kafka and Writing Data to RDS
- Reading Data from Kafka and Writing Data to GaussDB(DWS)
- Reading Data from Kafka and Writing Data to Elasticsearch
- Reading Data from MySQL CDC and Writing Data to GaussDB(DWS)
- Reading Data from PostgreSQL CDC and Writing Data to GaussDB(DWS)
- Configuring High-Reliability Flink Jobs (Automatic Restart upon Exceptions)
- Flink Jar Jobs
-
Spark Jar Jobs
- Using Spark Jar Jobs to Read and Query OBS Data
- Using the Spark Job to Access DLI Metadata
- Using Spark-submit to Submit a Spark Jar Job
- Submitting a Spark Jar Job Using Livy
- Using Spark Jobs to Access Data Sources of Datasource Connections
- Change History
-
Spark SQL Syntax Reference
- Common Configuration Items
- Spark SQL Syntax
- Spark Open Source Commands
- Databases
-
Tables
- Creating an OBS Table
- Creating a DLI Table
- Deleting a Table
- Viewing a Table
- Modifying a Table
-
Partition-related Syntax
- Adding Partition Data (Only OBS Tables Supported)
- Renaming a Partition (Only OBS Tables Supported)
- Deleting a Partition
- Deleting Partitions by Specifying Filter Criteria (Only Supported on OBS Tables)
- Altering the Partition Location of a Table (Only OBS Tables Supported)
- Updating Partitioned Table Data (Only OBS Tables Supported)
- Updating Table Metadata with REFRESH TABLE
- Backing Up and Restoring Data of Multiple Versions
- Table Lifecycle Management
- Data
- Exporting Query Results
-
Datasource Connections
- Creating a Datasource Connection with an HBase Table
- Creating a Datasource Connection with an OpenTSDB Table
- Creating a Datasource Connection with a DWS Table
- Creating a Datasource Connection with an RDS Table
- Creating a Datasource Connection with a CSS Table
- Creating a Datasource Connection with a DCS Table
- Creating a Datasource Connection with a DDS Table
- Creating a Datasource Connection with an Oracle Table
- Views
- Viewing the Execution Plan
- Data Permissions
- Data Types
- User-Defined Functions
-
Built-In Functions
-
Date Functions
- Overview
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
String Functions
- Overview
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- Mathematical Functions
- Aggregate Functions
- Window Functions
- Other Functions
-
Date Functions
- SELECT
-
Identifiers
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- class_name
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- hdfs_path
- input_expression
- input_format_classname
- jar_path
- join_condition
- non_equi_join_condition
- number
- num_buckets
- output_format_classname
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- row_format
- select_statement
- separator
- serde_name
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- view_name
- view_properties
- when_expression
- where_condition
- window_function
- Operators
-
Flink SQL Syntax Reference
-
Flink OpenSource SQL 1.15 Syntax Reference
- Constraints and Definitions
- Overview
- Flink OpenSource SQL 1.15 Usage
- Formats
- Connectors
- DML Snytax
-
Functions
- UDFs
- Type Inference
- Parameter Transfer
-
Built-In Functions
- Comparison Functions
- Logical Functions
- Arithmetic Functions
- String Functions
- Temporal Functions
- Conditional Functions
- Type Conversion Functions
- Collection Functions
- JSON Functions
- Value Construction Functions
- Value Retrieval Functions
- Grouping Functions
- Hash Functions
- Aggregate Functions
- Table-Valued Functions
- Flink OpenSource SQL 1.12 Syntax Reference
-
Flink Opensource SQL 1.10 Syntax Reference
- Constraints and Definitions
- Flink OpenSource SQL 1.10 Syntax
-
Data Definition Language (DDL)
- Creating a Source Table
-
Creating a Result Table
- ClickHouse Result Table
- Kafka Result Table
- Upsert Kafka Result Table
- DIS Result Table
- JDBC Result Table
- GaussDB(DWS) Result Table
- Redis Result Table
- SMN Result Table
- HBase Result Table
- Elasticsearch Result Table
- OpenTSDB Result Table
- User-defined Result Table
- Print Result Table
- File System Result Table
- Creating a Dimension Table
- Data Manipulation Language (DML)
- Functions
-
Historical Version
-
Flink SQL Syntax (This Syntax Will Not Evolve. Use FlinkOpenSource SQL Instead.)
- Constraints and Definitions
- Overview
- Creating a Source Stream
-
Creating a Sink Stream
- CloudTable HBase Sink Stream
- CloudTable OpenTSDB Sink Stream
- MRS OpenTSDB Sink Stream
- CSS Elasticsearch Sink Stream
- DCS Sink Stream
- DDS Sink Stream
- DIS Sink Stream
- DMS Sink Stream
- DWS Sink Stream (JDBC Mode)
- DWS Sink Stream (OBS-based Dumping)
- MRS HBase Sink Stream
- MRS Kafka Sink Stream
- Open-Source Kafka Sink Stream
- File System Sink Stream (Recommended)
- OBS Sink Stream
- RDS Sink Stream
- SMN Sink Stream
- Creating a Temporary Stream
- Creating a Dimension Table
- Custom Stream Ecosystem
- Data Manipulation Language (DML)
- Data Types
- User-Defined Functions
- Built-In Functions
- Geographical Functions
- Configuring Time Models
- Pattern Matching
- StreamingML
- Reserved Keywords
-
Flink SQL Syntax (This Syntax Will Not Evolve. Use FlinkOpenSource SQL Instead.)
-
Flink OpenSource SQL 1.15 Syntax Reference
-
API Reference
- Before You Start
- Overview
- Calling APIs
- Getting Started
- Permission-related APIs
- Global Variable-related APIs
- APIs Related to Enhanced Datasource Connections
-
APIs Related to Elastic Resource Pools
- Creating an Elastic Resource Pool
- Querying All Elastic Resource Pools
- Deleting an Elastic Resource Pool
- Modifying Elastic Resource Pool Information
- Querying All Queues in an Elastic Resource Pool
- Associating a Queue with an Elastic Resource Pool
- Viewing Scaling History of an Elastic Resource Pool
- Modifying the Scaling Policy of a Queue Associated with an Elastic Resource Pool
- Queue-related APIs (Recommended)
- SQL Job-related APIs
- SQL Template-related APIs
-
Flink Job-related APIs
- Creating a SQL Job
- Updating a SQL Job
- Creating a Flink Jar job
- Updating a Flink Jar Job
- Running Jobs in Batches
- Listing Jobs
- Querying Job Details
- Querying the Job Execution Plan
- Stopping Jobs in Batches
- Deleting a Job
- Deleting Jobs in Batches
- Exporting a Flink Job
- Importing a Flink Job
- Generating a Static Stream Graph for a Flink SQL Job
- APIs Related to Flink Job Templates
- Spark Job-related APIs
- APIs Related to Spark Job Templates
- Permissions Policies and Supported Actions
-
Out-of-Date APIs
- Agency-related APIs (Discarded)
-
Package Group-related APIs (Discarded)
- Uploading a Package Group (Discarded)
- Listing Package Groups (Discarded)
- Uploading a JAR Package Group (Discarded)
- Uploading a PyFile Package Group (Discarded)
- Uploading a File Package Group (Discarded)
- Querying Resource Packages in a Group (Discarded)
- Deleting a Resource Package from a Group (Discarded)
- Changing the Owner of a Group or Resource Package (Discarded)
- APIs Related to Spark Batch Processing (Discarded)
- SQL Job-related APIs (Discarded)
- Resource-related APIs (Discarded)
- Permission-related APIs (Discarded)
- Queue-related APIs (Discarded)
- Datasource Authentication-related APIs (Discarded)
- APIs Related to Enhanced Datasource Connections (Discarded)
- Template-related APIs (Discarded)
- APIs Related to Flink Jobs (Discarded)
- Public Parameters
- SDK Reference
-
FAQs
-
Flink Jobs
-
Usage
- What Data Formats and Data Sources Are Supported by DLI Flink Jobs?
- How Do I Authorize a Subuser to View Flink Jobs?
- How Do I Set Auto Restart upon Exception for a Flink Job?
- How Do I Save Flink Job Logs?
- How Can I Check Flink Job Results?
- Why Is Error "No such user. userName:xxxx." Reported on the Flink Job Management Page When I Grant Permission to a User?
- How Do I Know Which Checkpoint the Flink Job I Stopped Will Be Restored to When I Start the Job Again?
- Which Flink Version Does DLI Support? Is Flink 1.13 Supported? Which Version Is the Next?
- Why Is a Message Displayed Indicating That the SMN Topic Does Not Exist When I Use the SMN Topic in DLI?
-
Flink SQL
- How Much Data Can Be Processed in a Day by a Flink SQL Job?
- Does Data in the Temporary Stream of Flink SQL Need to Be Cleared Periodically? How Do I Clear the Data?
- Why Is a Message Displayed Indicating That the OBS Bucket Is Not Authorized When I Select an OBS Bucket for a Flink SQL Job?
- How Do I Create an OBS Partitioned Table for a Flink SQL Job?
- How Do I Change the Number of Kafka Partitions of a Flink SQL Job Without Stopping It?
- How Do I Dump Data to OBS and Create an OBS Partitioned Table?
- Why Is Error Message "DLI.0005" Displayed When I Use an EL Expression to Create a Table in a Flink SQL Job?
- Why Is No Data Queried in the DLI Table Created Using the OBS File Path When Data Is Written to OBS by a Flink Job Output Stream?
- Why Does a Flink SQL Job Fails to Be Executed, and Is "connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null" Displayed in the Log?
- Why Is Error "Not authorized" Reported When a Flink SQL Job Reads DIS Data?
- Data Writing Fails After a Flink SQL Job Consumed Kafka and Sank Data to the Elasticsearch Cluster
- How Does Flink Opensource SQL Parse Nested JSON?
- Why Is the RDS Database Time Read by a Flink Opensource SQL Job Different from RDS Database Time?
- What Are the Syntax Differences Between Flink SQL and Flink Opensource SQL?
- Why Does Job Submission Fail When the failure-handler Parameter of the Elasticsearch Result Table for a Flink Opensource SQL Job Is Set to retry_rejected?
-
Flink Jar Jobs
- How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?
- Does a Flink JAR Job Support Configuration File Upload? How Do I Upload a Configuration File?
- Why Does the Submission Fail Due to Flink JAR File Conflict?
- Why Does a Flink Jar Job Fail to Access GaussDB(DWS) and a Message Is Displayed Indicating Too Many Client Connections?
- Why Is Error Message "Authentication failed" Displayed During Flink Jar Job Running?
- Why Is Error Invalid OBS Bucket Name Reported After a Flink Job Submission Failed?
- Why Does the Flink Submission Fail Due to Hadoop JAR File Conflict?
- How Do I Connect a Flink jar Job to SASL_SSL?
- Performance Tuning
-
O&M Guide
- How Do I Locate a Flink Job Submission Error?
- How Do I Locate a Flink Job Running Error?
- How Do I Know Whether a Flink Job Can Be Restored from a Checkpoint After Being Restarted?
- Why Does DIS Stream Not Exist During Job Semantic Check?
- Why Is the OBS Bucket Selected for Job Not Authorized?
- Why Are Logs Not Written to the OBS Bucket After a DLI Flink Job Fails to Be Submitted for Running?
- How Do I Configure Connection Retries for Kafka Sink If it is Disconnected?
- Why Is Information Displayed on the FlinkUI/Spark UI Page Incomplete?
- Why Is the Flink Job Abnormal Due to Heartbeat Timeout Between JobManager and TaskManager?
- Why Is Error "Timeout expired while fetching topic metadata" Repeatedly Reported in Flink JobManager Logs?
-
Usage
-
Problems Related to SQL Jobs
- Usage
-
Job Development
- How Do I Merge Small Files?
- How Do I Use DLI to Access Data in an OBS Bucket?
- How Do I Specify an OBS Path When Creating an OBS Table?
- How Do I Create a Table Using JSON Data in an OBS Bucket?
- How Can I Use the count Function to Perform Aggregation?
- How Do I Synchronize DLI Table Data from One Region to Another?
- How Do I Insert Table Data into Specific Fields of a Table Using a SQL Job?
- How Do I Delete Table Data?
-
Job O&M Errors
- Why Is Error "path obs://xxx already exists" Reported When Data Is Exported to OBS?
- Why Is Error "SQL_ANALYSIS_ERROR: Reference 't.id' is ambiguous, could be: t.id, t.id.;" Displayed When Two Tables Are Joined?
- Why Is Error "The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget." Reported when a SQL Statement Is Executed?
- Why Is Error "There should be at least one partition pruning predicate on partitioned table XX.YYY" Reported When a Query Statement Is Executed?
- Why Is Error "IllegalArgumentException: Buffer size too small. size" Reported When Data Is Loaded to an OBS Foreign Table?
- Why Is Error "DLI.0002 FileNotFoundException" Reported During SQL Job Running?
- Why Is a Schema Parsing Error Reported When I Create a Hive Table Using CTAS?
- Why Is Error "org.apache.hadoop.fs.obs.OBSIOException" Reported When I Run DLI SQL Scripts on DataArts Studio?
- Why Is Error "UQUERY_CONNECTOR_0001:Invoke DLI service api failed" Reported in the Job Log When I Use CDM to Migrate Data to DLI?
- Why Is Error "File not Found" Reported When I Access a SQL Job?
- Why Is Error "DLI.0003: AccessControlException XXX" Reported When I Access a SQL Job?
- Why Is Error "DLI.0001: org.apache.hadoop.security.AccessControlException: verifyBucketExists on {{bucket name}}: status [403]" Reported When I Access a SQL Job?
- Why Is Error "The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget" Reported During SQL Statement Execution? Restricted for no budget.
-
O&M Guide
- How Do I Troubleshoot Slow SQL Jobs?
- How Do I View DLI SQL Logs?
- How Do I View SQL Execution Records?
- How Do I Eliminate Data Skew by Configuring AE Parameters?
- What Can I Do If a Table Cannot Be Queried on the DLI Console?
- The Compression Ratio of OBS Tables Is Too High
- How Can I Avoid Garbled Characters Caused by Inconsistent Character Codes?
- Do I Need to Grant Table Permissions to a User and Project After I Delete a Table and Create One with the Same Name?
- Why Can't I Query Table Data After Data Is Imported to a DLI Partitioned Table Because the File to Be Imported Does Not Contain Data in the Partitioning Column?
- How Do I Fix the Data Error Caused by CRLF Characters in a Field of the OBS File Used to Create an External OBS Table?
- Why Does a SQL Job That Has Join Operations Stay in the Running State?
- The on Clause Is Not Added When Tables Are Joined. Cartesian Product Query Causes High Resource Usage of the Queue, and the Job Fails to Be Executed
- Why Can't I Query Data After I Manually Add Data to the Partition Directory of an OBS Table?
- Why Is All Data Overwritten When insert overwrite Is Used to Overwrite Partitioned Table?
- Why Is a SQL Job Stuck in the Submitting State?
- Why Is the create_date Field in the RDS Table Is a Timestamp in the DLI query result?
- What Can I Do If datasize Cannot Be Changed After the Table Name Is Changed in a Finished SQL Job?
- Why Is the Data Volume Changes When Data Is Imported from DLI to OBS?
-
Problems Related to Spark Jobs
- Usage
-
Job Development
- How Do I Use Spark to Write Data into a DLI Table?
- How Do I Set the AK/SK for a Queue to Operate an OBS Table?
- How Do I View the Resource Usage of DLI Spark Jobs?
- How Do I Use Python Scripts to Access the MySQL Database If the pymysql Module Is Missing from the Spark Job Results Stored in MySQL?
- How Do I Run a Complex PySpark Program in DLI?
- How Does a Spark Job Access a MySQL Database?
- How Do I Use JDBC to Set the spark.sql.shuffle.partitions Parameter to Improve the Task Concurrency?
- How Do I Read Uploaded Files for a Spark Jar Job?
-
Job O&M Errors
- What Can I Do When Receiving java.lang.AbstractMethodError in the Spark Job?
- Why Are Errors "ResponseCode: 403" and "ResponseStatus: Forbidden" Reported When a Spark Job Accesses OBS Data?
- Why Is Error "verifyBucketExists on XXXX: status [403]" Reported When I Use a Spark Job to Access an OBS Bucket That I Have Access Permission?
- Why Is a Job Running Timeout Reported When a Spark Job Runs a Large Amount of Data?
- Why Does the Job Fail to Be Executed and the Log Shows that the File Directory Is Abnormal When I Use a Spark Job to Access Files in SFTP?
- Why Does the Job Fail to Be Executed Due to Insufficient Database and Table Permissions?
- Why Can't I Find the global_temp Database in the Spark 3.x Job Log?
- Why Does the DataSource Syntax Fail to Create an OBS Table in Avro Format When Spark 2.3.x Is Used?
- O&M Guide
-
Product Consultation
-
Usage
- What Is DLI?
- What Are the Application Scenarios of DLI?
- Which Data Formats Does DLI Support?
- What Are the Differences Between DLI Flink and MRS Flink?
- What Are the Differences Between MRS Spark and DLI Spark?
- Where Can DLI Data Be Stored?
- What Are the Differences Between DLI Tables and OBS Tables?
- How Can I Use DLI If Data Is Not Uploaded to OBS?
- Can I Import OBS Bucket Data Shared by Other Tenants into DLI?
- Why Is Error "Failed to create the database. {"error_code":"DLI.1028";"error_msg":"Already reached the maximum quota of databases:XXX"." Reported?
- Can a Member Account Use Global Variables Created by Other Member Accounts?
- Job Management
- Privacy and Security
-
Usage
-
Billing
- What Is the Billing Mode of DLI?
- Can I Change a Yearly/Monthly Queue to a Pay-per-Use Queue?
- Can I Change a Pay-Per-Use Queue to a Yearly/Monthly Queue?
- When Is a Data Lake Queue Idle?
- How Do I Check the Billing?
- What Is the Difference Between the Following Two Payment Modes: One Is to Purchase 4,000-CU Resources for Three Months at a Time, the Other Is to Purchase 4,000-CU Resources for One Month for Three Times?
- How Do I Purchase 30-CU Resources?
- How Will I Be Billed if Only Two CUs Are Used to Run a Flink Job on a Queue of 16 CUs?
- Do I Need to Pay Extra Fees for Purchasing a Queue Billed Based on the Scanned Data Volume?
- Quota
-
Permission
- Usage
-
O&M Guide
- Why Does My Account Have Insufficient Permissions Due to Arrears?
- Why Does the System Display a Message Indicating Insufficient Permissions When I Update a Program Package?
- Why Is Error "DLI.0003: Permission denied for resource..." Reported When I Run a SQL Statement?
- Why Can't I Query Table Data After I've Been Granted Table Permissions?
- Will an Error Be Reported if the Inherited Permissions Are Regranted to a Table That Inherits Database Permissions?
- Why Can't I Query a View After I'm Granted the Select Table Permission on the View?
-
Queue
- Usage
-
O&M Guide
- How Do I View DLI Queue Load?
- How Do I Determine Whether There Are Too Many Jobs in the Current Queue?
- How Do I Switch an Earlier-Version Spark Queue to a General-Purpose Queue?
- Why Cannot I View the Resource Running Status of DLI Queues on Cloud Eye?
- How Do I Allocate Queue Resources for Running Spark Jobs If I Have Purchased 64 CUs?
- Why Is Error "Queue plans create failed. The plan xxx target cu is out of quota" Reported When I Schedule CU Changes?
- Why Is a Timeout Exception Reported When a DLI SQL Statement Fails to Be Executed on the Default Queue?
-
Datasource Connections
-
Datasource Connections
- Why Do I Need to Create a VPC Peering Connection for an Enhanced Datasource Connection?
- Failed to Bind a Queue to an Enhanced Datasource Connection
- DLI Failed to Connect to GaussDB(DWS) Through an Enhanced Datasource Connection
- How Do I Do if the Datasource Connection Is Created But the Network Connectivity Test Fails?
- How Do I Configure the Network Between a DLI Queue and a Data Source?
- What Can I Do If a Datasource Connection Is Stuck in Creating State When I Try to Bind a Queue to It?
- How Do I Bind a Datasource Connection to a Pay-per-Use Queue?
- How Do I Connect DLI to Data Sources?
- Cross-Source Analysis
-
Datasource Connection O&M
- Why Is the Error Message "communication link failure" Displayed When I Use a Newly Activated Datasource Connection?
- Connection Times Out During MRS HBase Datasource Connection, and No Error Is Recorded in Logs
- Why Can't I Find the Subnet When Creating a DLI Datasource Connection?
- Error Message "Incorrect string value" Is Displayed When insert overwrite Is Executed on a Datasource RDS Table
- Null Pointer Error Is Displayed When the System Creates a Datasource RDS Table
- Error Message "org.postgresql.util.PSQLException: ERROR: tuple concurrently updated" Is Displayed When the System Executes insert overwrite on a Datasource GaussDB(DWS) Table
- RegionTooBusyException Is Reported When Data Is Imported to a CloudTable HBase Table Through a Datasource Table
- A Null Value Is Written Into a Non-Null Field When a DLI Datasource Connection Is Used to Connect to a GaussDB(DWS) Table
- An Insert Operation Failed After the Schema of the GaussDB(DWS) Source Table Is Updated
-
Datasource Connections
-
APIs
- How Do I Obtain the AK/SK Pair?
- How Do I Obtain the Project ID?
- Why Is Error "unsupported media Type" Reported When I Subimt a SQL Job?
- Are Project IDs of Different Accounts the Same When They Are Used to Call APIs?
- What Can I Do If an Error Is Reported When the Execution of the API for Creating a SQL Job Times Out?
- What Can I Do If Chinese Characters Returned by an API Are Garbled Characters?
- SDKs
- Change History
-
Flink Jobs
- Videos
-
SQL Syntax Reference (To Be Offline)
- Notice on Taking This Syntax Reference Offline
-
Spark SQL Syntax Reference (Unavailable Soon)
- Common Configuration Items of Batch SQL Jobs
- SQL Syntax Overview of Batch Jobs
- Databases
- Creating an OBS Table
- Creating a DLI Table
- Deleting a Table
- Viewing Tables
- Modifying a Table
-
Syntax for Partitioning a Table
- Adding Partition Data (Only OBS Tables Supported)
- Renaming a Partition (Only OBS Tables Supported)
- Deleting a Partition
- Deleting Partitions by Specifying Filter Criteria (Only OBS Tables Supported)
- Altering the Partition Location of a Table (Only OBS Tables Supported)
- Updating Partitioned Table Data (Only OBS Tables Supported)
- Updating Table Metadata with REFRESH TABLE
- Importing Data to the Table
- Inserting Data
- Clearing Data
- Exporting Search Results
- Backing Up and Restoring Data of Multiple Versions
- Table Lifecycle Management
- Creating a Datasource Connection with an HBase Table
- Creating a Datasource Connection with an OpenTSDB Table
- Creating a Datasource Connection with a DWS table
- Creating a Datasource Connection with an RDS Table
- Creating a Datasource Connection with a CSS Table
- Creating a Datasource Connection with a DCS Table
- Creating a Datasource Connection with a DDS Table
- Creating a Datasource Connection with an Oracle Table
- Views
- Viewing the Execution Plan
- Data Permissions Management
- Data Types
- User-Defined Functions
-
Built-in Functions
-
Date Functions
- Overview
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
String Functions
- Overview
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- Mathematical Functions
- Aggregate Functions
- Window Functions
- Other Functions
-
Date Functions
- Basic SELECT Statements
- Filtering
- Sorting
- Grouping
- JOIN
- Subquery
- Alias
- Set Operations
- WITH...AS
- CASE...WHEN
- OVER Clause
- Flink OpenSource SQL 1.12 Syntax Reference
-
Flink Opensource SQL 1.10 Syntax Reference
- Constraints and Definitions
- Flink OpenSource SQL 1.10 Syntax
-
Data Definition Language (DDL)
- Creating a Source Table
-
Creating a Result Table
- ClickHouse Result Table
- Kafka Result Table
- Upsert Kafka Result Table
- DIS Result Table
- JDBC Result Table
- GaussDB(DWS) Result Table
- Redis Result Table
- SMN Result Table
- HBase Result Table
- Elasticsearch Result Table
- OpenTSDB Result Table
- User-defined Result Table
- Print Result Table
- File System Result Table
- Creating a Dimension Table
- Data Manipulation Language (DML)
- Functions
-
Historical Versions (Unavailable Soon)
-
Flink SQL Syntax
- SQL Syntax Constraints and Definitions
- SQL Syntax Overview of Stream Jobs
- Creating a Source Stream
-
Creating a Sink Stream
- CloudTable HBase Sink Stream
- CloudTable OpenTSDB Sink Stream
- MRS OpenTSDB Sink Stream
- CSS Elasticsearch Sink Stream
- DCS Sink Stream
- DDS Sink Stream
- DIS Sink Stream
- DMS Sink Stream
- DWS Sink Stream (JDBC Mode)
- DWS Sink Stream (OBS-based Dumping)
- MRS HBase Sink Stream
- MRS Kafka Sink Stream
- Open-Source Kafka Sink Stream
- File System Sink Stream (Recommended)
- OBS Sink Stream
- RDS Sink Stream
- SMN Sink Stream
- Creating a Temporary Stream
- Creating a Dimension Table
- Custom Stream Ecosystem
- Data Type
- Built-In Functions
- User-Defined Functions
- Geographical Functions
- SELECT
- Condition Expression
- Window
- JOIN Between Stream Data and Table Data
- Configuring Time Models
- Pattern Matching
- StreamingML
- Reserved Keywords
-
Flink SQL Syntax
-
Identifiers
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- input_expression
- join_condition
- non_equi_join_condition
- number
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- select_statement
- separator
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- when_expression
- where_condition
- window_function
- Operators
- Change History
Submitting a Spark Jar Job Using Livy
Introduction to DLI Livy
DLI Livy is an Apache Livy-based client tool used to submit Spark jobs to DLI.
Preparations
- Create a queue. Set Queue Usage to For general purpose, that is, the computing resources of the Spark job.
- Prepare a Linux ECS for installing DLI Livy.
- Enable ports 30000 to 32767 and port 8998 on the ECS. For details, see Adding a Security Group Rule.
- Install JDK on the ECS. JDK 1.8 is recommended. Configure Java environment variable JAVA_HOME.
- View the ECS details to obtain its private IP address.
- Use an enhanced datasource connection to connect the DLI queue to the VPC where the Livy instance is located.
Downloading and Installing DLI Livy
The software package used in the following operations is apache-livy-0.7.2.0103-bin.tar.gz. Replace it with the latest one.
- Download the DLI Livy software package.
- Use WinSCP to upload the obtained software package to the prepared ECS directory.
- Log in to ECS as user root and perform the following steps to install DLI Livy:
- Run the following command to create an installation directory:
mkdir Livy installation directory
For example, to create the /opt/livy directory, run the mkdir /opt/livy command. The following operations use the /opt/livy installation directory as an example. Replace it as required.
- Run the following command to decompress the software package to the installation directory:
tar --extract --file apache-livy-0.7.2.0103-bin.tar.gz --directory /opt/livy --strip-components 1 --no-same-owner
- Run the following commands to change the configuration file name:
mv livy-client.conf.template livy-client.conf
mv livy.conf.template livy.conf
mv livy-env.sh.template livy-env.sh
mv log4j.properties.template log4j.properties
mv spark-blacklist.conf.template spark-blacklist.conf
touch spark-defaults.conf
- Run the following command to create an installation directory:
Modifying the DLI Livy Configuration File
- Upload the specified DLI Livy JAR package to the OBS bucket directory.
- Log in to OBS console and create a directory for storing the DLI Livy JAR package in the specified OBS bucket, for example: obs://bucket/livy/jars/.
- Go to the installation directory of the ECS where the DLI-Livy tool has been installed in 3.a, obtain Livy JAR packages, and upload them to the OBS bucket directory created in 1.a:
For example, if the installation path is /opt/livy, the JAR packages you need to upload are as follows:
/opt/livy/rsc-jars/livy-api-0.7.2.0103.jar /opt/livy/rsc-jars/livy-rsc-0.7.2.0103.jar /opt/livy/repl_2.11-jars/livy-core_2.11-0.7.2.0103.jar /opt/livy/repl_2.11-jars/livy-repl_2.11-0.7.2.0103.jar
- Modify the DLI Livy configuration file.
- Run the following command to modify the /opt/livy/conf/livy-client.conf configuration file:
vi /opt/livy/conf/livy-client.conf
Add the following content to the file and modify the configuration items as required:# Set the private IP address of the ECS, which can be obtained by running the ifconfig command. livy.rsc.launcher.address = X.X.X.X # Set the ports enabled on the ECS. livy.rsc.launcher.port.range = 30000~32767
- Run the following command to modify the /opt/livy/conf/livy.conf configuration file:
vi /opt/livy/conf/livy.conf
Add the following content to the file and modify the configuration items as required:livy.server.port = 8998 livy.spark.master = yarn livy.server.contextLauncher.custom.class=org.apache.livy.rsc.DliContextLauncher livy.server.batch.custom.class=org.apache.livy.server.batch.DliBatchSession livy.server.interactive.custom.class=org.apache.livy.server.interactive.DliInteractiveSession livy.server.sparkApp.custom.class=org.apache.livy.utils.SparkDliApp livy.server.recovery.mode = recovery livy.server.recovery.state-store = filesystem # Change the following file directory of DLI Livy as needed: livy.server.recovery.state-store.url = file:///opt/livy/store/ livy.server.session.timeout-check = true livy.server.session.timeout = 1800s livy.server.session.state-retain.sec = 1800s livy.dli.spark.version = 2.3.2 livy.dli.spark.scala-version = 2.11 # Enter the OBS bucket directory created in 1.a for storing the Livy JAR package. livy.repl.jars = obs://bucket/livy/jars/livy-core_2.11-0.7.2.0103.jar, obs://bucket/livy/jars/livy-repl_2.11-0.7.2.0103.jar livy.rsc.jars = obs://bucket/livy/jars/livy-api-0.7.2.0103.jar, obs://bucket/livy/jars/livy-rsc-0.7.2.0103.jar
- Run the following command to modify the /opt/livy/conf/spark-defaults.conf configuration file:
vi /opt/livy/conf/spark-defaults.conf
Add the following parameters to the file. For details about the parameter configurations, see Table 1.
# The following parameters can be overwritten when a job is submitted. spark.yarn.isPython=true spark.pyspark.python=python3 # Enter the production environment URL of DLI. spark.dli.user.uiBaseAddress=https://console.huaweicloud.com/dli/web # Set the region where the queue is located. spark.dli.user.regionName=XXXX # Set the DLI endpoint address. spark.dli.user.dliEndPoint=XXXX # Enter the name of the created DLI queue. spark.dli.user.queueName=XXXX # Set the AK used for submitting a job. spark.dli.user.access.key=XXXX # Set the SK used for submitting a job. spark.dli.user.secret.key=XXXX # Set the project ID used for submitting a job. spark.dli.user.projectId=XXXX
Table 1 Mandatory parameters in spark-defaults.conf Parameter
Description
spark.dli.user.regionName
Name of the region where the DLI queue is located.
you can obtain the region name from .
spark.dli.user.dliEndPoint
Endpoint where the DLI queue is located. You can obtain the endpoint from .
spark.dli.user.queueName
Queue name.
spark.dli.user.access.key
Access key of a user. The user must have the Spark job permissions. For details, see .
For details about how to obtain the AK/SK, see .
spark.dli.user.secret.key
spark.dli.user.projectId
Project ID, which can be obtained by refering to "Obtaining a Project ID".
The following parameters are optional. Set them based on the parameter description and site requirements. For details about these parameters, see Spark Configuration.
Table 2 Optional parameters in spark-defaults.conf Spark Job Parameter
Spark Batch Processing Parameter
Remarks
spark.dli.user.file
file
Not required for connecting to the notebook tool
spark.dli.user.className
class_name
Not required for connecting to the notebook tool
spark.dli.user.scType
sc_type
Same as the native Livy configuration
spark.dli.user.args
args
Same as the native Livy configuration
spark.submit.pyFiles
python_files
Same as the native Livy configuration
spark.files
files
Same as the native Livy configuration
spark.dli.user.modules
modules
-
spark.dli.user.image
image
Custom image used for submitting a job. This parameter is available for container clusters only and is not set by default.
spark.dli.user.autoRecovery
auto_recovery
-
spark.dli.user.maxRetryTimes
max_retry_times
-
spark.dli.user.catalogName
catalog_name
To access metadata, set this parameter to dli.
- Run the following command to modify the /opt/livy/conf/livy-client.conf configuration file:
Starting DLI Livy
- Run the following command to go to the DLI Livy installation directory:
Example: cd /opt/livy
- Run the following command to start DLI Livy:
./bin/livy-server start
Submitting a Spark Job Using DLI Livy
The following demonstrates how to submit a Spark job to DLI using DLI Livy and by running the curl command.
- Upload the JAR file of the developed Spark job program to the OBS directory.
For example, upload spark-examples_2.11-XXXX.jar to the obs://bucket/path directory.
- Log in to the ECS server where DLI Livy is installed as user root.
- Run the curl command to submit a Spark job request to DLI using DLI Livy.
NOTE:
ECS_IP indicates the private IP address of the ECS where DLI Livy is installed.
curl --location --request POST 'http://ECS_IP:8998/batches' \ --header 'Content-Type: application/json' \ --data-raw '{ "driverMemory": "3G", "driverCores": 1, "executorMemory": "2G", "executorCores": 1, "numExecutors": 1, "args": [ "1000" ], "file": "obs://bucket/path/spark-examples_2.11-XXXX.jar", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.dynamicAllocation.minExecutors": 1, "spark.executor.instances": 1, "spark.dynamicAllocation.initialExecutors": 1, "spark.dynamicAllocation.maxExecutors": 2 } }'
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.