- What's New
- Function Overview
- Product Bulletin
- Service Overview
-
Billing
- Billing Overview
- Billing for Compute Resources
- Billing for Storage Resources
- Billing for Scanned Data
- Yearly/Monthly Billing
- Billing Examples
- Renewing Subscriptions
- Bills
- Arrears
- Billing Termination
-
Billing FAQ
- What Billing Modes Does DLI Offer?
- Would a Pay-Per-Use DLI Queue Not Be Billed if No Job Is Submitted for Execution?
- When Is a Data Lake Queue Idle?
- How Do I Troubleshoot DLI Billing Issues?
- Why Am I Still Being Billed on a Pay-per-Use Basis After I Purchased a Yearly/Monthly Package?
- How Do I View the Usage of a Package?
- How Do I Purchase a 30-CU Queue?
- How Will I Be Billed If I Purchased a Pay-per-Use 16-CU DLI Queue but Only Used 2 CUs to Run Jobs?
- How Do I View a Job's Scanned Data Volume?
- Would a Pay-Per-Use Elastic Resource Pool Not Be Billed if No Job Is Submitted for Execution?
- Do I Need to Pay Extra Fees for Purchasing a Queue Billed Based on the Scanned Data Volume?
- How Is the Usage Beyond the Package Limit Billed?
- What Are the Actual CUs, CU Range, and Specifications of an Elastic Resource Pool?
- Change History
- Getting Started
-
User Guide
- DLI Console Overview
-
Creating an Elastic Resource Pool and Queues Within It
- Overview of DLI Elastic Resource Pools and Queues
- Creating an Elastic Resource Pool
- Creating Queues in an Elastic Resource Pool
- Creating a Standard Queue (Discarded and Not Recommended)
- Managing Elastic Resource Pools
-
Managing Queues
- Queue Permission Management
- Adjusting Scaling Policies for Queues in an Elastic Resource Pool
- Allocating a Queue to an Enterprise Project
- Creating an SMN Topic
- Managing Queue Tags
- Setting Queue Properties
- Testing Address Connectivity
- Modifying Queue Specifications
- Deleting a Queue
- Elastic Scaling of Standard Queues (Discarded and Not Recommended)
- Setting a Scheduled Elastic Scaling Task for a Standard Queue (Discarded and Not Recommended)
- Changing the CIDR Block of a Standard Queue (Discarded and Not Recommended)
- Example Use Case: Creating an Elastic Resource Pool and Running Jobs
- Example Use Case: Configuring Scaling Policies for Queues in an Elastic Resource Pool
- Job Management
- Data Management
-
Enhanced Datasource Connections
- Overview
- Cross-Source Analysis Development Methods
- Creating an Enhanced Datasource Connection
- Deleting an Enhanced Datasource Connection
- Modifying Host Information in an Elastic Resource Pool
- Binding and Unbinding a Queue
- Adding a Route
- Deleting a Route
- Enhanced Connection Permission Management
- Enhanced Datasource Connection Tag Management
- Datasource Authentication
- Job Templates
- DLI Agency Permissions
- Creating a DLI Global Variable
- Configuring a DLI Job Bucket
- SQL Inspector
- Creating a Custom Image
- Permissions Management
- Other Common Operations
- Permission Management for Global Variables
- Best Practices
-
Developer Guide
- SQL Jobs
-
Flink OpenSource SQL Jobs
- Reading Data from Kafka and Writing Data to RDS
- Reading Data from Kafka and Writing Data to GaussDB(DWS)
- Reading Data from Kafka and Writing Data to Elasticsearch
- Reading Data from MySQL CDC and Writing Data to GaussDB(DWS)
- Reading Data from PostgreSQL CDC and Writing Data to GaussDB(DWS)
- Configuring High-Reliability Flink Jobs (Automatic Restart upon Exceptions)
- Flink Jar Jobs
-
Spark Jar Jobs
- Using Spark Jar Jobs to Read and Query OBS Data
- Using the Spark Job to Access DLI Metadata
- Using Spark-submit to Submit a Spark Jar Job
- Submitting a Spark Jar Job Using Livy
- Using Spark Jobs to Access Data Sources of Datasource Connections
- Change History
-
Spark SQL Syntax Reference
- Common Configuration Items
- Spark SQL Syntax
- Spark Open Source Commands
- Databases
-
Tables
- Creating an OBS Table
- Creating a DLI Table
- Deleting a Table
- Viewing a Table
- Modifying a Table
-
Partition-related Syntax
- Adding Partition Data (Only OBS Tables Supported)
- Renaming a Partition (Only OBS Tables Supported)
- Deleting a Partition
- Deleting Partitions by Specifying Filter Criteria (Only Supported on OBS Tables)
- Altering the Partition Location of a Table (Only OBS Tables Supported)
- Updating Partitioned Table Data (Only OBS Tables Supported)
- Updating Table Metadata with REFRESH TABLE
- Backing Up and Restoring Data of Multiple Versions
- Table Lifecycle Management
- Data
- Exporting Query Results
-
Datasource Connections
- Creating a Datasource Connection with an HBase Table
- Creating a Datasource Connection with an OpenTSDB Table
- Creating a Datasource Connection with a DWS Table
- Creating a Datasource Connection with an RDS Table
- Creating a Datasource Connection with a CSS Table
- Creating a Datasource Connection with a DCS Table
- Creating a Datasource Connection with a DDS Table
- Creating a Datasource Connection with an Oracle Table
- Views
- Viewing the Execution Plan
- Data Permissions
- Data Types
- User-Defined Functions
-
Built-In Functions
-
Date Functions
- Overview
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
String Functions
- Overview
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- Mathematical Functions
- Aggregate Functions
- Window Functions
- Other Functions
-
Date Functions
- SELECT
-
Identifiers
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- class_name
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- hdfs_path
- input_expression
- input_format_classname
- jar_path
- join_condition
- non_equi_join_condition
- number
- num_buckets
- output_format_classname
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- row_format
- select_statement
- separator
- serde_name
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- view_name
- view_properties
- when_expression
- where_condition
- window_function
- Operators
-
Flink SQL Syntax Reference
-
Flink OpenSource SQL 1.15 Syntax Reference
- Constraints and Definitions
- Overview
- Flink OpenSource SQL 1.15 Usage
- Formats
- Connectors
- DML Snytax
-
Functions
- UDFs
- Type Inference
- Parameter Transfer
-
Built-In Functions
- Comparison Functions
- Logical Functions
- Arithmetic Functions
- String Functions
- Temporal Functions
- Conditional Functions
- Type Conversion Functions
- Collection Functions
- JSON Functions
- Value Construction Functions
- Value Retrieval Functions
- Grouping Functions
- Hash Functions
- Aggregate Functions
- Table-Valued Functions
- Flink OpenSource SQL 1.12 Syntax Reference
-
Flink Opensource SQL 1.10 Syntax Reference
- Constraints and Definitions
- Flink OpenSource SQL 1.10 Syntax
-
Data Definition Language (DDL)
- Creating a Source Table
-
Creating a Result Table
- ClickHouse Result Table
- Kafka Result Table
- Upsert Kafka Result Table
- DIS Result Table
- JDBC Result Table
- GaussDB(DWS) Result Table
- Redis Result Table
- SMN Result Table
- HBase Result Table
- Elasticsearch Result Table
- OpenTSDB Result Table
- User-defined Result Table
- Print Result Table
- File System Result Table
- Creating a Dimension Table
- Data Manipulation Language (DML)
- Functions
-
Historical Version
-
Flink SQL Syntax (This Syntax Will Not Evolve. Use FlinkOpenSource SQL Instead.)
- Constraints and Definitions
- Overview
- Creating a Source Stream
-
Creating a Sink Stream
- CloudTable HBase Sink Stream
- CloudTable OpenTSDB Sink Stream
- MRS OpenTSDB Sink Stream
- CSS Elasticsearch Sink Stream
- DCS Sink Stream
- DDS Sink Stream
- DIS Sink Stream
- DMS Sink Stream
- DWS Sink Stream (JDBC Mode)
- DWS Sink Stream (OBS-based Dumping)
- MRS HBase Sink Stream
- MRS Kafka Sink Stream
- Open-Source Kafka Sink Stream
- File System Sink Stream (Recommended)
- OBS Sink Stream
- RDS Sink Stream
- SMN Sink Stream
- Creating a Temporary Stream
- Creating a Dimension Table
- Custom Stream Ecosystem
- Data Manipulation Language (DML)
- Data Types
- User-Defined Functions
- Built-In Functions
- Geographical Functions
- Configuring Time Models
- Pattern Matching
- StreamingML
- Reserved Keywords
-
Flink SQL Syntax (This Syntax Will Not Evolve. Use FlinkOpenSource SQL Instead.)
-
Flink OpenSource SQL 1.15 Syntax Reference
-
API Reference
- Before You Start
- Overview
- Calling APIs
- Getting Started
- Permission-related APIs
- Global Variable-related APIs
- APIs Related to Enhanced Datasource Connections
-
APIs Related to Elastic Resource Pools
- Creating an Elastic Resource Pool
- Querying All Elastic Resource Pools
- Deleting an Elastic Resource Pool
- Modifying Elastic Resource Pool Information
- Querying All Queues in an Elastic Resource Pool
- Associating a Queue with an Elastic Resource Pool
- Viewing Scaling History of an Elastic Resource Pool
- Modifying the Scaling Policy of a Queue Associated with an Elastic Resource Pool
- Queue-related APIs (Recommended)
- SQL Job-related APIs
- SQL Template-related APIs
-
Flink Job-related APIs
- Creating a SQL Job
- Updating a SQL Job
- Creating a Flink Jar job
- Updating a Flink Jar Job
- Running Jobs in Batches
- Listing Jobs
- Querying Job Details
- Querying the Job Execution Plan
- Stopping Jobs in Batches
- Deleting a Job
- Deleting Jobs in Batches
- Exporting a Flink Job
- Importing a Flink Job
- Generating a Static Stream Graph for a Flink SQL Job
- APIs Related to Flink Job Templates
- Spark Job-related APIs
- APIs Related to Spark Job Templates
- Permissions Policies and Supported Actions
-
Out-of-Date APIs
- Agency-related APIs (Discarded)
-
Package Group-related APIs (Discarded)
- Uploading a Package Group (Discarded)
- Listing Package Groups (Discarded)
- Uploading a JAR Package Group (Discarded)
- Uploading a PyFile Package Group (Discarded)
- Uploading a File Package Group (Discarded)
- Querying Resource Packages in a Group (Discarded)
- Deleting a Resource Package from a Group (Discarded)
- Changing the Owner of a Group or Resource Package (Discarded)
- APIs Related to Spark Batch Processing (Discarded)
- SQL Job-related APIs (Discarded)
- Resource-related APIs (Discarded)
- Permission-related APIs (Discarded)
- Queue-related APIs (Discarded)
- Datasource Authentication-related APIs (Discarded)
- APIs Related to Enhanced Datasource Connections (Discarded)
- Template-related APIs (Discarded)
- APIs Related to Flink Jobs (Discarded)
- Public Parameters
- SDK Reference
-
FAQs
-
Flink Jobs
-
Usage
- What Data Formats and Data Sources Are Supported by DLI Flink Jobs?
- How Do I Authorize a Subuser to View Flink Jobs?
- How Do I Set Auto Restart upon Exception for a Flink Job?
- How Do I Save Flink Job Logs?
- How Can I Check Flink Job Results?
- Why Is Error "No such user. userName:xxxx." Reported on the Flink Job Management Page When I Grant Permission to a User?
- How Do I Know Which Checkpoint the Flink Job I Stopped Will Be Restored to When I Start the Job Again?
- Which Flink Version Does DLI Support? Is Flink 1.13 Supported? Which Version Is the Next?
- Why Is a Message Displayed Indicating That the SMN Topic Does Not Exist When I Use the SMN Topic in DLI?
-
Flink SQL
- How Much Data Can Be Processed in a Day by a Flink SQL Job?
- Does Data in the Temporary Stream of Flink SQL Need to Be Cleared Periodically? How Do I Clear the Data?
- Why Is a Message Displayed Indicating That the OBS Bucket Is Not Authorized When I Select an OBS Bucket for a Flink SQL Job?
- How Do I Create an OBS Partitioned Table for a Flink SQL Job?
- How Do I Change the Number of Kafka Partitions of a Flink SQL Job Without Stopping It?
- How Do I Dump Data to OBS and Create an OBS Partitioned Table?
- Why Is Error Message "DLI.0005" Displayed When I Use an EL Expression to Create a Table in a Flink SQL Job?
- Why Is No Data Queried in the DLI Table Created Using the OBS File Path When Data Is Written to OBS by a Flink Job Output Stream?
- Why Does a Flink SQL Job Fails to Be Executed, and Is "connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null" Displayed in the Log?
- Why Is Error "Not authorized" Reported When a Flink SQL Job Reads DIS Data?
- Data Writing Fails After a Flink SQL Job Consumed Kafka and Sank Data to the Elasticsearch Cluster
- How Does Flink Opensource SQL Parse Nested JSON?
- Why Is the RDS Database Time Read by a Flink Opensource SQL Job Different from RDS Database Time?
- What Are the Syntax Differences Between Flink SQL and Flink Opensource SQL?
- Why Does Job Submission Fail When the failure-handler Parameter of the Elasticsearch Result Table for a Flink Opensource SQL Job Is Set to retry_rejected?
-
Flink Jar Jobs
- How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?
- Does a Flink JAR Job Support Configuration File Upload? How Do I Upload a Configuration File?
- Why Does the Submission Fail Due to Flink JAR File Conflict?
- Why Does a Flink Jar Job Fail to Access GaussDB(DWS) and a Message Is Displayed Indicating Too Many Client Connections?
- Why Is Error Message "Authentication failed" Displayed During Flink Jar Job Running?
- Why Is Error Invalid OBS Bucket Name Reported After a Flink Job Submission Failed?
- Why Does the Flink Submission Fail Due to Hadoop JAR File Conflict?
- How Do I Connect a Flink jar Job to SASL_SSL?
- Performance Tuning
-
O&M Guide
- How Do I Locate a Flink Job Submission Error?
- How Do I Locate a Flink Job Running Error?
- How Do I Know Whether a Flink Job Can Be Restored from a Checkpoint After Being Restarted?
- Why Does DIS Stream Not Exist During Job Semantic Check?
- Why Is the OBS Bucket Selected for Job Not Authorized?
- Why Are Logs Not Written to the OBS Bucket After a DLI Flink Job Fails to Be Submitted for Running?
- How Do I Configure Connection Retries for Kafka Sink If it is Disconnected?
- Why Is Information Displayed on the FlinkUI/Spark UI Page Incomplete?
- Why Is the Flink Job Abnormal Due to Heartbeat Timeout Between JobManager and TaskManager?
- Why Is Error "Timeout expired while fetching topic metadata" Repeatedly Reported in Flink JobManager Logs?
-
Usage
-
Problems Related to SQL Jobs
- Usage
-
Job Development
- How Do I Merge Small Files?
- How Do I Use DLI to Access Data in an OBS Bucket?
- How Do I Specify an OBS Path When Creating an OBS Table?
- How Do I Create a Table Using JSON Data in an OBS Bucket?
- How Can I Use the count Function to Perform Aggregation?
- How Do I Synchronize DLI Table Data from One Region to Another?
- How Do I Insert Table Data into Specific Fields of a Table Using a SQL Job?
- How Do I Delete Table Data?
-
Job O&M Errors
- Why Is Error "path obs://xxx already exists" Reported When Data Is Exported to OBS?
- Why Is Error "SQL_ANALYSIS_ERROR: Reference 't.id' is ambiguous, could be: t.id, t.id.;" Displayed When Two Tables Are Joined?
- Why Is Error "The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget." Reported when a SQL Statement Is Executed?
- Why Is Error "There should be at least one partition pruning predicate on partitioned table XX.YYY" Reported When a Query Statement Is Executed?
- Why Is Error "IllegalArgumentException: Buffer size too small. size" Reported When Data Is Loaded to an OBS Foreign Table?
- Why Is Error "DLI.0002 FileNotFoundException" Reported During SQL Job Running?
- Why Is a Schema Parsing Error Reported When I Create a Hive Table Using CTAS?
- Why Is Error "org.apache.hadoop.fs.obs.OBSIOException" Reported When I Run DLI SQL Scripts on DataArts Studio?
- Why Is Error "UQUERY_CONNECTOR_0001:Invoke DLI service api failed" Reported in the Job Log When I Use CDM to Migrate Data to DLI?
- Why Is Error "File not Found" Reported When I Access a SQL Job?
- Why Is Error "DLI.0003: AccessControlException XXX" Reported When I Access a SQL Job?
- Why Is Error "DLI.0001: org.apache.hadoop.security.AccessControlException: verifyBucketExists on {{bucket name}}: status [403]" Reported When I Access a SQL Job?
- Why Is Error "The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget" Reported During SQL Statement Execution? Restricted for no budget.
-
O&M Guide
- How Do I Troubleshoot Slow SQL Jobs?
- How Do I View DLI SQL Logs?
- How Do I View SQL Execution Records?
- How Do I Eliminate Data Skew by Configuring AE Parameters?
- What Can I Do If a Table Cannot Be Queried on the DLI Console?
- The Compression Ratio of OBS Tables Is Too High
- How Can I Avoid Garbled Characters Caused by Inconsistent Character Codes?
- Do I Need to Grant Table Permissions to a User and Project After I Delete a Table and Create One with the Same Name?
- Why Can't I Query Table Data After Data Is Imported to a DLI Partitioned Table Because the File to Be Imported Does Not Contain Data in the Partitioning Column?
- How Do I Fix the Data Error Caused by CRLF Characters in a Field of the OBS File Used to Create an External OBS Table?
- Why Does a SQL Job That Has Join Operations Stay in the Running State?
- The on Clause Is Not Added When Tables Are Joined. Cartesian Product Query Causes High Resource Usage of the Queue, and the Job Fails to Be Executed
- Why Can't I Query Data After I Manually Add Data to the Partition Directory of an OBS Table?
- Why Is All Data Overwritten When insert overwrite Is Used to Overwrite Partitioned Table?
- Why Is a SQL Job Stuck in the Submitting State?
- Why Is the create_date Field in the RDS Table Is a Timestamp in the DLI query result?
- What Can I Do If datasize Cannot Be Changed After the Table Name Is Changed in a Finished SQL Job?
- Why Is the Data Volume Changes When Data Is Imported from DLI to OBS?
-
Problems Related to Spark Jobs
- Usage
-
Job Development
- How Do I Use Spark to Write Data into a DLI Table?
- How Do I Set the AK/SK for a Queue to Operate an OBS Table?
- How Do I View the Resource Usage of DLI Spark Jobs?
- How Do I Use Python Scripts to Access the MySQL Database If the pymysql Module Is Missing from the Spark Job Results Stored in MySQL?
- How Do I Run a Complex PySpark Program in DLI?
- How Does a Spark Job Access a MySQL Database?
- How Do I Use JDBC to Set the spark.sql.shuffle.partitions Parameter to Improve the Task Concurrency?
- How Do I Read Uploaded Files for a Spark Jar Job?
-
Job O&M Errors
- What Can I Do When Receiving java.lang.AbstractMethodError in the Spark Job?
- Why Are Errors "ResponseCode: 403" and "ResponseStatus: Forbidden" Reported When a Spark Job Accesses OBS Data?
- Why Is Error "verifyBucketExists on XXXX: status [403]" Reported When I Use a Spark Job to Access an OBS Bucket That I Have Access Permission?
- Why Is a Job Running Timeout Reported When a Spark Job Runs a Large Amount of Data?
- Why Does the Job Fail to Be Executed and the Log Shows that the File Directory Is Abnormal When I Use a Spark Job to Access Files in SFTP?
- Why Does the Job Fail to Be Executed Due to Insufficient Database and Table Permissions?
- Why Can't I Find the global_temp Database in the Spark 3.x Job Log?
- Why Does the DataSource Syntax Fail to Create an OBS Table in Avro Format When Spark 2.3.x Is Used?
- O&M Guide
-
Product Consultation
-
Usage
- What Is DLI?
- What Are the Application Scenarios of DLI?
- Which Data Formats Does DLI Support?
- What Are the Differences Between DLI Flink and MRS Flink?
- What Are the Differences Between MRS Spark and DLI Spark?
- Where Can DLI Data Be Stored?
- What Are the Differences Between DLI Tables and OBS Tables?
- How Can I Use DLI If Data Is Not Uploaded to OBS?
- Can I Import OBS Bucket Data Shared by Other Tenants into DLI?
- Why Is Error "Failed to create the database. {"error_code":"DLI.1028";"error_msg":"Already reached the maximum quota of databases:XXX"." Reported?
- Can a Member Account Use Global Variables Created by Other Member Accounts?
- Job Management
- Privacy and Security
-
Usage
-
Billing
- What Is the Billing Mode of DLI?
- Can I Change a Yearly/Monthly Queue to a Pay-per-Use Queue?
- Can I Change a Pay-Per-Use Queue to a Yearly/Monthly Queue?
- When Is a Data Lake Queue Idle?
- How Do I Check the Billing?
- What Is the Difference Between the Following Two Payment Modes: One Is to Purchase 4,000-CU Resources for Three Months at a Time, the Other Is to Purchase 4,000-CU Resources for One Month for Three Times?
- How Do I Purchase 30-CU Resources?
- How Will I Be Billed if Only Two CUs Are Used to Run a Flink Job on a Queue of 16 CUs?
- Do I Need to Pay Extra Fees for Purchasing a Queue Billed Based on the Scanned Data Volume?
- Quota
-
Permission
- Usage
-
O&M Guide
- Why Does My Account Have Insufficient Permissions Due to Arrears?
- Why Does the System Display a Message Indicating Insufficient Permissions When I Update a Program Package?
- Why Is Error "DLI.0003: Permission denied for resource..." Reported When I Run a SQL Statement?
- Why Can't I Query Table Data After I've Been Granted Table Permissions?
- Will an Error Be Reported if the Inherited Permissions Are Regranted to a Table That Inherits Database Permissions?
- Why Can't I Query a View After I'm Granted the Select Table Permission on the View?
-
Queue
- Usage
-
O&M Guide
- How Do I View DLI Queue Load?
- How Do I Determine Whether There Are Too Many Jobs in the Current Queue?
- How Do I Switch an Earlier-Version Spark Queue to a General-Purpose Queue?
- Why Cannot I View the Resource Running Status of DLI Queues on Cloud Eye?
- How Do I Allocate Queue Resources for Running Spark Jobs If I Have Purchased 64 CUs?
- Why Is Error "Queue plans create failed. The plan xxx target cu is out of quota" Reported When I Schedule CU Changes?
- Why Is a Timeout Exception Reported When a DLI SQL Statement Fails to Be Executed on the Default Queue?
-
Datasource Connections
-
Datasource Connections
- Why Do I Need to Create a VPC Peering Connection for an Enhanced Datasource Connection?
- Failed to Bind a Queue to an Enhanced Datasource Connection
- DLI Failed to Connect to GaussDB(DWS) Through an Enhanced Datasource Connection
- How Do I Do if the Datasource Connection Is Created But the Network Connectivity Test Fails?
- How Do I Configure the Network Between a DLI Queue and a Data Source?
- What Can I Do If a Datasource Connection Is Stuck in Creating State When I Try to Bind a Queue to It?
- How Do I Bind a Datasource Connection to a Pay-per-Use Queue?
- How Do I Connect DLI to Data Sources?
- Cross-Source Analysis
-
Datasource Connection O&M
- Why Is the Error Message "communication link failure" Displayed When I Use a Newly Activated Datasource Connection?
- Connection Times Out During MRS HBase Datasource Connection, and No Error Is Recorded in Logs
- Why Can't I Find the Subnet When Creating a DLI Datasource Connection?
- Error Message "Incorrect string value" Is Displayed When insert overwrite Is Executed on a Datasource RDS Table
- Null Pointer Error Is Displayed When the System Creates a Datasource RDS Table
- Error Message "org.postgresql.util.PSQLException: ERROR: tuple concurrently updated" Is Displayed When the System Executes insert overwrite on a Datasource GaussDB(DWS) Table
- RegionTooBusyException Is Reported When Data Is Imported to a CloudTable HBase Table Through a Datasource Table
- A Null Value Is Written Into a Non-Null Field When a DLI Datasource Connection Is Used to Connect to a GaussDB(DWS) Table
- An Insert Operation Failed After the Schema of the GaussDB(DWS) Source Table Is Updated
-
Datasource Connections
-
APIs
- How Do I Obtain the AK/SK Pair?
- How Do I Obtain the Project ID?
- Why Is Error "unsupported media Type" Reported When I Subimt a SQL Job?
- Are Project IDs of Different Accounts the Same When They Are Used to Call APIs?
- What Can I Do If an Error Is Reported When the Execution of the API for Creating a SQL Job Times Out?
- What Can I Do If Chinese Characters Returned by an API Are Garbled Characters?
- SDKs
- Change History
-
Flink Jobs
- Videos
-
SQL Syntax Reference (To Be Offline)
- Notice on Taking This Syntax Reference Offline
-
Spark SQL Syntax Reference (Unavailable Soon)
- Common Configuration Items of Batch SQL Jobs
- SQL Syntax Overview of Batch Jobs
- Databases
- Creating an OBS Table
- Creating a DLI Table
- Deleting a Table
- Viewing Tables
- Modifying a Table
-
Syntax for Partitioning a Table
- Adding Partition Data (Only OBS Tables Supported)
- Renaming a Partition (Only OBS Tables Supported)
- Deleting a Partition
- Deleting Partitions by Specifying Filter Criteria (Only OBS Tables Supported)
- Altering the Partition Location of a Table (Only OBS Tables Supported)
- Updating Partitioned Table Data (Only OBS Tables Supported)
- Updating Table Metadata with REFRESH TABLE
- Importing Data to the Table
- Inserting Data
- Clearing Data
- Exporting Search Results
- Backing Up and Restoring Data of Multiple Versions
- Table Lifecycle Management
- Creating a Datasource Connection with an HBase Table
- Creating a Datasource Connection with an OpenTSDB Table
- Creating a Datasource Connection with a DWS table
- Creating a Datasource Connection with an RDS Table
- Creating a Datasource Connection with a CSS Table
- Creating a Datasource Connection with a DCS Table
- Creating a Datasource Connection with a DDS Table
- Creating a Datasource Connection with an Oracle Table
- Views
- Viewing the Execution Plan
- Data Permissions Management
- Data Types
- User-Defined Functions
-
Built-in Functions
-
Date Functions
- Overview
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
String Functions
- Overview
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- Mathematical Functions
- Aggregate Functions
- Window Functions
- Other Functions
-
Date Functions
- Basic SELECT Statements
- Filtering
- Sorting
- Grouping
- JOIN
- Subquery
- Alias
- Set Operations
- WITH...AS
- CASE...WHEN
- OVER Clause
- Flink OpenSource SQL 1.12 Syntax Reference
-
Flink Opensource SQL 1.10 Syntax Reference
- Constraints and Definitions
- Flink OpenSource SQL 1.10 Syntax
-
Data Definition Language (DDL)
- Creating a Source Table
-
Creating a Result Table
- ClickHouse Result Table
- Kafka Result Table
- Upsert Kafka Result Table
- DIS Result Table
- JDBC Result Table
- GaussDB(DWS) Result Table
- Redis Result Table
- SMN Result Table
- HBase Result Table
- Elasticsearch Result Table
- OpenTSDB Result Table
- User-defined Result Table
- Print Result Table
- File System Result Table
- Creating a Dimension Table
- Data Manipulation Language (DML)
- Functions
-
Historical Versions (Unavailable Soon)
-
Flink SQL Syntax
- SQL Syntax Constraints and Definitions
- SQL Syntax Overview of Stream Jobs
- Creating a Source Stream
-
Creating a Sink Stream
- CloudTable HBase Sink Stream
- CloudTable OpenTSDB Sink Stream
- MRS OpenTSDB Sink Stream
- CSS Elasticsearch Sink Stream
- DCS Sink Stream
- DDS Sink Stream
- DIS Sink Stream
- DMS Sink Stream
- DWS Sink Stream (JDBC Mode)
- DWS Sink Stream (OBS-based Dumping)
- MRS HBase Sink Stream
- MRS Kafka Sink Stream
- Open-Source Kafka Sink Stream
- File System Sink Stream (Recommended)
- OBS Sink Stream
- RDS Sink Stream
- SMN Sink Stream
- Creating a Temporary Stream
- Creating a Dimension Table
- Custom Stream Ecosystem
- Data Type
- Built-In Functions
- User-Defined Functions
- Geographical Functions
- SELECT
- Condition Expression
- Window
- JOIN Between Stream Data and Table Data
- Configuring Time Models
- Pattern Matching
- StreamingML
- Reserved Keywords
-
Flink SQL Syntax
-
Identifiers
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- input_expression
- join_condition
- non_equi_join_condition
- number
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- select_statement
- separator
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- when_expression
- where_condition
- window_function
- Operators
- Change History
Calling UDTFs in Spark SQL Jobs
Scenario
You can use Hive User-Defined Table-Generating Functions (UDTF) to customize table-valued functions. Hive UDTFs are used for the one-in-multiple-out data operations. UDTF reads a row of data and output multiple values.
Constraints
- To perform UDTF-related operations on DLI, you need to create a SQL queue instead of using the default queue.
- When UDTFs are used by multiple accounts, other users, except the user who creates them, need to be authorized before using the UDTF. The authorization operations are as follows:
Log in to the DLI console and choose Data Management > Package Management. On the displayed page, select your UDTF Jar package and click Manage Permissions in the Operation column. On the permission management page, click Grant Permission in the upper right corner and select the required permissions.
- If you use a static class or interface in a UDF, add try catch to capture exceptions. Otherwise, package conflicts may occur.
Environment Preparations
Before you start, set up the development environment.
Item |
Description |
---|---|
OS |
Windows 7 or later |
JDK |
JDK 1.8. |
IntelliJ IDEA |
This tool is used for application development. The version of the tool must be 2019.1 or other compatible versions. |
Maven |
Basic configurations of the development environment. Maven is used for project management throughout the lifecycle of software development. |
Development Process

No. |
Phase |
Software Portal |
Description |
---|---|---|---|
1 |
Create a Maven project and configure the POM file. |
IntelliJ IDEA |
Write UDTF code by referring the steps in Procedure. |
2 |
Write UDTF code. |
||
3 |
Debug, compile, and pack the code into a Jar package. |
||
4 |
Upload the Jar package to OBS. |
OBS console |
Upload the UDTF Jar file to an OBS directory. |
5 |
Create the UDTF on DLI. |
DLI console |
Create a UDTF on the SQL job management page of the DLI console. |
6 |
Verify and use the UDTF on DLI. |
DLI console |
Use the UDTF in your DLI job. |
Procedure
- Create a Maven project and configure the POM file. This step uses IntelliJ IDEA 2020.2 as an example.
- Start IntelliJ IDEA and choose File > New > Project.
Figure 2 Creating a project
- Choose Maven, set Project SDK to 1.8, and click Next.
Figure 3 Choosing Maven
- Set the project name, configure the storage path, and click Finish.
Figure 4 Creating a project
- Add the following content to the pom.xml file.
<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> </dependencies>
Figure 5 Adding configurations to the POM file - Choose src > main and right-click the java folder. Choose New > Package to create a package and a class file.
Figure 6 Creating a package and a class file
Set the package name as you need. In this example, set Package to com.huawei.demo. Then, press Enter.
Figure 7 Customizing a packageCreate a Java Class file in the package path. In this example, the Java Class file is UDTFSplit.
Figure 8 Creating a Java class file
- Start IntelliJ IDEA and choose File > New > Project.
- Write UDTF code. For sample code, see Sample Code.
The UDTF class must inherit org.apache.hadoop.hive.ql.udf.generic.GenericUDTF to implement the initialize, process, and close methods.
- Call the initialize method in the UDTF. This method returns the information about the returned data rows of the UDTF, such as the number and type.
- Call the process method to process data. Each time forward() is called in the process method, a row is generated.
If multiple columns are generated, you can put the values in an array and pass the array to the forward() function.
public void process(Object[] args) throws HiveException { // TODO Auto-generated method stub if(args.length == 0){ return; } String input = args[0].toString(); if(StringUtils.isEmpty(input)){ return; } String[] test = input.split(";"); for (int i = 0; i < test.length; i++) { try { String[] result = test[i].split(":"); forward(result); } catch (Exception e) { continue; } } }
- Call the close method to clear methods that need to be closed.
- Use IntelliJ IDEA to compile the code and pack it into the JAR package.
- Click Maven in the tool bar on the right, and click clean and compile to compile the code.
After the compilation is successful, click package.Figure 9 Compiling and packaging
The generated JAR package is stored in the target directory. In this example, MyUDTF-1.0-SNAPSHOT.jar is stored in D:\MyUDTF\target.
Figure 10 Generating a JAR file
- Click Maven in the tool bar on the right, and click clean and compile to compile the code.
- Log in to the OBS console and upload the file to the OBS path.
NOTE:
The region of the OBS bucket to which the Jar package is uploaded must be the same as the region of the DLI queue. Cross-region operations are not allowed.
- (Optional) Upload the file to DLI for package management.
- Log in to the DLI management console and choose Data Management > Package Management.
- On the Package Management page, click Create in the upper right corner.
- In the Create Package dialog, set the following parameters:
- Type: Select JAR.
- OBS Path: Specify the OBS path for storing the package.
- Set Group and Group Name as required for package identification and management.
- Click OK.
Figure 11 Creating a package
- Create the UDTF on DLI.
- Log in to the DLI console, choose SQL Editor. Set Engine to spark, and select the created SQL queue and database.
Figure 12 Selecting the queue and database
- In the SQL editing area, enter the path of the JAR file to be uploaded to create a UDTF and click Execute.
CREATE FUNCTION mytestsplit AS 'com.huawei.demo.UDTFSplit' using jar 'obs://dli-test-obs01/MyUDTF-1.0-SNAPSHOT.jar';
- Log in to the DLI console, choose SQL Editor. Set Engine to spark, and select the created SQL queue and database.
- Restart the original SQL queue for the added function to take effect.
- Log in to the DLI management console and choose Resources > Queue Management from the navigation pane. In the Operation column of the SQL queue job, click Restart.
- In the Restart dialog box, click OK.
- Verify and use the UDTF on DLI.
Use the UDTF created in 6 in the SELECT statement as follows:
select mytestsplit('abc:123\;efd:567\;utf:890');
Figure 13 Execution result - (Optional) Delete the UDTF.
If this function is no longer used, run the following statement to delete the function:
Drop FUNCTION mytestsplit;
Sample Code
The complete UDTFSplit.java code is as follows:
import java.util.ArrayList; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class UDTFSplit extends GenericUDTF { @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public void process(Object[] args) throws HiveException { // TODO Auto-generated method stub if(args.length == 0){ return; } String input = args[0].toString(); if(StringUtils.isEmpty(input)){ return; } String[] test = input.split(";"); for (int i = 0; i < test.length; i++) { try { String[] result = test[i].split(":"); forward(result); } catch (Exception e) { continue; } } } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.