- What's New
- Function Overview
- Product Bulletin
- Service Overview
-
Billing
- Billing Overview
- Billing for Compute Resources
- Billing for Storage Resources
- Billing for Scanned Data
- Yearly/Monthly Billing
- Billing Examples
- Renewing Subscriptions
- Bills
- Arrears
- Billing Termination
-
Billing FAQ
- What Billing Modes Does DLI Offer?
- Would a Pay-Per-Use DLI Queue Not Be Billed if No Job Is Submitted for Execution?
- When Is a Data Lake Queue Idle?
- How Do I Troubleshoot DLI Billing Issues?
- Why Am I Still Being Billed on a Pay-per-Use Basis After I Purchased a Yearly/Monthly Package?
- How Do I View the Usage of a Package?
- How Do I Purchase a 30-CU Queue?
- How Will I Be Billed If I Purchased a Pay-per-Use 16-CU DLI Queue but Only Used 2 CUs to Run Jobs?
- How Do I View a Job's Scanned Data Volume?
- Would a Pay-Per-Use Elastic Resource Pool Not Be Billed if No Job Is Submitted for Execution?
- Do I Need to Pay Extra Fees for Purchasing a Queue Billed Based on the Scanned Data Volume?
- How Is the Usage Beyond the Package Limit Billed?
- What Are the Actual CUs, CU Range, and Specifications of an Elastic Resource Pool?
- Change History
- Getting Started
-
User Guide
- DLI Console Overview
-
Creating an Elastic Resource Pool and Queues Within It
- Overview of DLI Elastic Resource Pools and Queues
- Creating an Elastic Resource Pool
- Creating Queues in an Elastic Resource Pool
- Creating a Standard Queue (Discarded and Not Recommended)
- Managing Elastic Resource Pools
-
Managing Queues
- Queue Permission Management
- Adjusting Scaling Policies for Queues in an Elastic Resource Pool
- Allocating a Queue to an Enterprise Project
- Creating an SMN Topic
- Managing Queue Tags
- Setting Queue Properties
- Testing Address Connectivity
- Modifying Queue Specifications
- Deleting a Queue
- Elastic Scaling of Standard Queues (Discarded and Not Recommended)
- Setting a Scheduled Elastic Scaling Task for a Standard Queue (Discarded and Not Recommended)
- Changing the CIDR Block of a Standard Queue (Discarded and Not Recommended)
- Example Use Case: Creating an Elastic Resource Pool and Running Jobs
- Example Use Case: Configuring Scaling Policies for Queues in an Elastic Resource Pool
- Job Management
- Data Management
-
Enhanced Datasource Connections
- Overview
- Cross-Source Analysis Development Methods
- Creating an Enhanced Datasource Connection
- Deleting an Enhanced Datasource Connection
- Modifying Host Information in an Elastic Resource Pool
- Binding and Unbinding a Queue
- Adding a Route
- Deleting a Route
- Enhanced Connection Permission Management
- Enhanced Datasource Connection Tag Management
- Datasource Authentication
- Job Templates
- DLI Agency Permissions
- Creating a DLI Global Variable
- Configuring a DLI Job Bucket
- SQL Inspector
- Creating a Custom Image
- Permissions Management
- Other Common Operations
- Permission Management for Global Variables
- Best Practices
-
Developer Guide
- SQL Jobs
-
Flink OpenSource SQL Jobs
- Reading Data from Kafka and Writing Data to RDS
- Reading Data from Kafka and Writing Data to GaussDB(DWS)
- Reading Data from Kafka and Writing Data to Elasticsearch
- Reading Data from MySQL CDC and Writing Data to GaussDB(DWS)
- Reading Data from PostgreSQL CDC and Writing Data to GaussDB(DWS)
- Configuring High-Reliability Flink Jobs (Automatic Restart upon Exceptions)
- Flink Jar Jobs
-
Spark Jar Jobs
- Using Spark Jar Jobs to Read and Query OBS Data
- Using the Spark Job to Access DLI Metadata
- Using Spark-submit to Submit a Spark Jar Job
- Submitting a Spark Jar Job Using Livy
- Using Spark Jobs to Access Data Sources of Datasource Connections
- Change History
-
Spark SQL Syntax Reference
- Common Configuration Items
- Spark SQL Syntax
- Spark Open Source Commands
- Databases
-
Tables
- Creating an OBS Table
- Creating a DLI Table
- Deleting a Table
- Viewing a Table
- Modifying a Table
-
Partition-related Syntax
- Adding Partition Data (Only OBS Tables Supported)
- Renaming a Partition (Only OBS Tables Supported)
- Deleting a Partition
- Deleting Partitions by Specifying Filter Criteria (Only Supported on OBS Tables)
- Altering the Partition Location of a Table (Only OBS Tables Supported)
- Updating Partitioned Table Data (Only OBS Tables Supported)
- Updating Table Metadata with REFRESH TABLE
- Backing Up and Restoring Data of Multiple Versions
- Table Lifecycle Management
- Data
- Exporting Query Results
-
Datasource Connections
- Creating a Datasource Connection with an HBase Table
- Creating a Datasource Connection with an OpenTSDB Table
- Creating a Datasource Connection with a DWS Table
- Creating a Datasource Connection with an RDS Table
- Creating a Datasource Connection with a CSS Table
- Creating a Datasource Connection with a DCS Table
- Creating a Datasource Connection with a DDS Table
- Creating a Datasource Connection with an Oracle Table
- Views
- Viewing the Execution Plan
- Data Permissions
- Data Types
- User-Defined Functions
-
Built-In Functions
-
Date Functions
- Overview
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
String Functions
- Overview
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- Mathematical Functions
- Aggregate Functions
- Window Functions
- Other Functions
-
Date Functions
- SELECT
-
Identifiers
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- class_name
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- hdfs_path
- input_expression
- input_format_classname
- jar_path
- join_condition
- non_equi_join_condition
- number
- num_buckets
- output_format_classname
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- row_format
- select_statement
- separator
- serde_name
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- view_name
- view_properties
- when_expression
- where_condition
- window_function
- Operators
-
Flink SQL Syntax Reference
-
Flink OpenSource SQL 1.15 Syntax Reference
- Constraints and Definitions
- Overview
- Flink OpenSource SQL 1.15 Usage
- Formats
- Connectors
- DML Snytax
-
Functions
- UDFs
- Type Inference
- Parameter Transfer
-
Built-In Functions
- Comparison Functions
- Logical Functions
- Arithmetic Functions
- String Functions
- Temporal Functions
- Conditional Functions
- Type Conversion Functions
- Collection Functions
- JSON Functions
- Value Construction Functions
- Value Retrieval Functions
- Grouping Functions
- Hash Functions
- Aggregate Functions
- Table-Valued Functions
- Flink OpenSource SQL 1.12 Syntax Reference
-
Flink Opensource SQL 1.10 Syntax Reference
- Constraints and Definitions
- Flink OpenSource SQL 1.10 Syntax
-
Data Definition Language (DDL)
- Creating a Source Table
-
Creating a Result Table
- ClickHouse Result Table
- Kafka Result Table
- Upsert Kafka Result Table
- DIS Result Table
- JDBC Result Table
- GaussDB(DWS) Result Table
- Redis Result Table
- SMN Result Table
- HBase Result Table
- Elasticsearch Result Table
- OpenTSDB Result Table
- User-defined Result Table
- Print Result Table
- File System Result Table
- Creating a Dimension Table
- Data Manipulation Language (DML)
- Functions
-
Historical Version
-
Flink SQL Syntax (This Syntax Will Not Evolve. Use FlinkOpenSource SQL Instead.)
- Constraints and Definitions
- Overview
- Creating a Source Stream
-
Creating a Sink Stream
- CloudTable HBase Sink Stream
- CloudTable OpenTSDB Sink Stream
- MRS OpenTSDB Sink Stream
- CSS Elasticsearch Sink Stream
- DCS Sink Stream
- DDS Sink Stream
- DIS Sink Stream
- DMS Sink Stream
- DWS Sink Stream (JDBC Mode)
- DWS Sink Stream (OBS-based Dumping)
- MRS HBase Sink Stream
- MRS Kafka Sink Stream
- Open-Source Kafka Sink Stream
- File System Sink Stream (Recommended)
- OBS Sink Stream
- RDS Sink Stream
- SMN Sink Stream
- Creating a Temporary Stream
- Creating a Dimension Table
- Custom Stream Ecosystem
- Data Manipulation Language (DML)
- Data Types
- User-Defined Functions
- Built-In Functions
- Geographical Functions
- Configuring Time Models
- Pattern Matching
- StreamingML
- Reserved Keywords
-
Flink SQL Syntax (This Syntax Will Not Evolve. Use FlinkOpenSource SQL Instead.)
-
Flink OpenSource SQL 1.15 Syntax Reference
-
API Reference
- Before You Start
- Overview
- Calling APIs
- Getting Started
- Permission-related APIs
- Global Variable-related APIs
- APIs Related to Enhanced Datasource Connections
-
APIs Related to Elastic Resource Pools
- Creating an Elastic Resource Pool
- Querying All Elastic Resource Pools
- Deleting an Elastic Resource Pool
- Modifying Elastic Resource Pool Information
- Querying All Queues in an Elastic Resource Pool
- Associating a Queue with an Elastic Resource Pool
- Viewing Scaling History of an Elastic Resource Pool
- Modifying the Scaling Policy of a Queue Associated with an Elastic Resource Pool
- Queue-related APIs (Recommended)
- SQL Job-related APIs
- SQL Template-related APIs
-
Flink Job-related APIs
- Creating a SQL Job
- Updating a SQL Job
- Creating a Flink Jar job
- Updating a Flink Jar Job
- Running Jobs in Batches
- Listing Jobs
- Querying Job Details
- Querying the Job Execution Plan
- Stopping Jobs in Batches
- Deleting a Job
- Deleting Jobs in Batches
- Exporting a Flink Job
- Importing a Flink Job
- Generating a Static Stream Graph for a Flink SQL Job
- APIs Related to Flink Job Templates
- Spark Job-related APIs
- APIs Related to Spark Job Templates
- Permissions Policies and Supported Actions
-
Out-of-Date APIs
- Agency-related APIs (Discarded)
-
Package Group-related APIs (Discarded)
- Uploading a Package Group (Discarded)
- Listing Package Groups (Discarded)
- Uploading a JAR Package Group (Discarded)
- Uploading a PyFile Package Group (Discarded)
- Uploading a File Package Group (Discarded)
- Querying Resource Packages in a Group (Discarded)
- Deleting a Resource Package from a Group (Discarded)
- Changing the Owner of a Group or Resource Package (Discarded)
- APIs Related to Spark Batch Processing (Discarded)
- SQL Job-related APIs (Discarded)
- Resource-related APIs (Discarded)
- Permission-related APIs (Discarded)
- Queue-related APIs (Discarded)
- Datasource Authentication-related APIs (Discarded)
- APIs Related to Enhanced Datasource Connections (Discarded)
- Template-related APIs (Discarded)
- APIs Related to Flink Jobs (Discarded)
- Public Parameters
- SDK Reference
-
FAQs
-
Flink Jobs
-
Usage
- What Data Formats and Data Sources Are Supported by DLI Flink Jobs?
- How Do I Authorize a Subuser to View Flink Jobs?
- How Do I Set Auto Restart upon Exception for a Flink Job?
- How Do I Save Flink Job Logs?
- How Can I Check Flink Job Results?
- Why Is Error "No such user. userName:xxxx." Reported on the Flink Job Management Page When I Grant Permission to a User?
- How Do I Know Which Checkpoint the Flink Job I Stopped Will Be Restored to When I Start the Job Again?
- Which Flink Version Does DLI Support? Is Flink 1.13 Supported? Which Version Is the Next?
- Why Is a Message Displayed Indicating That the SMN Topic Does Not Exist When I Use the SMN Topic in DLI?
-
Flink SQL
- How Much Data Can Be Processed in a Day by a Flink SQL Job?
- Does Data in the Temporary Stream of Flink SQL Need to Be Cleared Periodically? How Do I Clear the Data?
- Why Is a Message Displayed Indicating That the OBS Bucket Is Not Authorized When I Select an OBS Bucket for a Flink SQL Job?
- How Do I Create an OBS Partitioned Table for a Flink SQL Job?
- How Do I Change the Number of Kafka Partitions of a Flink SQL Job Without Stopping It?
- How Do I Dump Data to OBS and Create an OBS Partitioned Table?
- Why Is Error Message "DLI.0005" Displayed When I Use an EL Expression to Create a Table in a Flink SQL Job?
- Why Is No Data Queried in the DLI Table Created Using the OBS File Path When Data Is Written to OBS by a Flink Job Output Stream?
- Why Does a Flink SQL Job Fails to Be Executed, and Is "connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null" Displayed in the Log?
- Why Is Error "Not authorized" Reported When a Flink SQL Job Reads DIS Data?
- Data Writing Fails After a Flink SQL Job Consumed Kafka and Sank Data to the Elasticsearch Cluster
- How Does Flink Opensource SQL Parse Nested JSON?
- Why Is the RDS Database Time Read by a Flink Opensource SQL Job Different from RDS Database Time?
- What Are the Syntax Differences Between Flink SQL and Flink Opensource SQL?
- Why Does Job Submission Fail When the failure-handler Parameter of the Elasticsearch Result Table for a Flink Opensource SQL Job Is Set to retry_rejected?
-
Flink Jar Jobs
- How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?
- Does a Flink JAR Job Support Configuration File Upload? How Do I Upload a Configuration File?
- Why Does the Submission Fail Due to Flink JAR File Conflict?
- Why Does a Flink Jar Job Fail to Access GaussDB(DWS) and a Message Is Displayed Indicating Too Many Client Connections?
- Why Is Error Message "Authentication failed" Displayed During Flink Jar Job Running?
- Why Is Error Invalid OBS Bucket Name Reported After a Flink Job Submission Failed?
- Why Does the Flink Submission Fail Due to Hadoop JAR File Conflict?
- How Do I Connect a Flink jar Job to SASL_SSL?
- Performance Tuning
-
O&M Guide
- How Do I Locate a Flink Job Submission Error?
- How Do I Locate a Flink Job Running Error?
- How Do I Know Whether a Flink Job Can Be Restored from a Checkpoint After Being Restarted?
- Why Does DIS Stream Not Exist During Job Semantic Check?
- Why Is the OBS Bucket Selected for Job Not Authorized?
- Why Are Logs Not Written to the OBS Bucket After a DLI Flink Job Fails to Be Submitted for Running?
- How Do I Configure Connection Retries for Kafka Sink If it is Disconnected?
- Why Is Information Displayed on the FlinkUI/Spark UI Page Incomplete?
- Why Is the Flink Job Abnormal Due to Heartbeat Timeout Between JobManager and TaskManager?
- Why Is Error "Timeout expired while fetching topic metadata" Repeatedly Reported in Flink JobManager Logs?
-
Usage
-
Problems Related to SQL Jobs
- Usage
-
Job Development
- How Do I Merge Small Files?
- How Do I Use DLI to Access Data in an OBS Bucket?
- How Do I Specify an OBS Path When Creating an OBS Table?
- How Do I Create a Table Using JSON Data in an OBS Bucket?
- How Can I Use the count Function to Perform Aggregation?
- How Do I Synchronize DLI Table Data from One Region to Another?
- How Do I Insert Table Data into Specific Fields of a Table Using a SQL Job?
- How Do I Delete Table Data?
-
Job O&M Errors
- Why Is Error "path obs://xxx already exists" Reported When Data Is Exported to OBS?
- Why Is Error "SQL_ANALYSIS_ERROR: Reference 't.id' is ambiguous, could be: t.id, t.id.;" Displayed When Two Tables Are Joined?
- Why Is Error "The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget." Reported when a SQL Statement Is Executed?
- Why Is Error "There should be at least one partition pruning predicate on partitioned table XX.YYY" Reported When a Query Statement Is Executed?
- Why Is Error "IllegalArgumentException: Buffer size too small. size" Reported When Data Is Loaded to an OBS Foreign Table?
- Why Is Error "DLI.0002 FileNotFoundException" Reported During SQL Job Running?
- Why Is a Schema Parsing Error Reported When I Create a Hive Table Using CTAS?
- Why Is Error "org.apache.hadoop.fs.obs.OBSIOException" Reported When I Run DLI SQL Scripts on DataArts Studio?
- Why Is Error "UQUERY_CONNECTOR_0001:Invoke DLI service api failed" Reported in the Job Log When I Use CDM to Migrate Data to DLI?
- Why Is Error "File not Found" Reported When I Access a SQL Job?
- Why Is Error "DLI.0003: AccessControlException XXX" Reported When I Access a SQL Job?
- Why Is Error "DLI.0001: org.apache.hadoop.security.AccessControlException: verifyBucketExists on {{bucket name}}: status [403]" Reported When I Access a SQL Job?
- Why Is Error "The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget" Reported During SQL Statement Execution? Restricted for no budget.
-
O&M Guide
- How Do I Troubleshoot Slow SQL Jobs?
- How Do I View DLI SQL Logs?
- How Do I View SQL Execution Records?
- How Do I Eliminate Data Skew by Configuring AE Parameters?
- What Can I Do If a Table Cannot Be Queried on the DLI Console?
- The Compression Ratio of OBS Tables Is Too High
- How Can I Avoid Garbled Characters Caused by Inconsistent Character Codes?
- Do I Need to Grant Table Permissions to a User and Project After I Delete a Table and Create One with the Same Name?
- Why Can't I Query Table Data After Data Is Imported to a DLI Partitioned Table Because the File to Be Imported Does Not Contain Data in the Partitioning Column?
- How Do I Fix the Data Error Caused by CRLF Characters in a Field of the OBS File Used to Create an External OBS Table?
- Why Does a SQL Job That Has Join Operations Stay in the Running State?
- The on Clause Is Not Added When Tables Are Joined. Cartesian Product Query Causes High Resource Usage of the Queue, and the Job Fails to Be Executed
- Why Can't I Query Data After I Manually Add Data to the Partition Directory of an OBS Table?
- Why Is All Data Overwritten When insert overwrite Is Used to Overwrite Partitioned Table?
- Why Is a SQL Job Stuck in the Submitting State?
- Why Is the create_date Field in the RDS Table Is a Timestamp in the DLI query result?
- What Can I Do If datasize Cannot Be Changed After the Table Name Is Changed in a Finished SQL Job?
- Why Is the Data Volume Changes When Data Is Imported from DLI to OBS?
-
Problems Related to Spark Jobs
- Usage
-
Job Development
- How Do I Use Spark to Write Data into a DLI Table?
- How Do I Set the AK/SK for a Queue to Operate an OBS Table?
- How Do I View the Resource Usage of DLI Spark Jobs?
- How Do I Use Python Scripts to Access the MySQL Database If the pymysql Module Is Missing from the Spark Job Results Stored in MySQL?
- How Do I Run a Complex PySpark Program in DLI?
- How Does a Spark Job Access a MySQL Database?
- How Do I Use JDBC to Set the spark.sql.shuffle.partitions Parameter to Improve the Task Concurrency?
- How Do I Read Uploaded Files for a Spark Jar Job?
-
Job O&M Errors
- What Can I Do When Receiving java.lang.AbstractMethodError in the Spark Job?
- Why Are Errors "ResponseCode: 403" and "ResponseStatus: Forbidden" Reported When a Spark Job Accesses OBS Data?
- Why Is Error "verifyBucketExists on XXXX: status [403]" Reported When I Use a Spark Job to Access an OBS Bucket That I Have Access Permission?
- Why Is a Job Running Timeout Reported When a Spark Job Runs a Large Amount of Data?
- Why Does the Job Fail to Be Executed and the Log Shows that the File Directory Is Abnormal When I Use a Spark Job to Access Files in SFTP?
- Why Does the Job Fail to Be Executed Due to Insufficient Database and Table Permissions?
- Why Can't I Find the global_temp Database in the Spark 3.x Job Log?
- Why Does the DataSource Syntax Fail to Create an OBS Table in Avro Format When Spark 2.3.x Is Used?
- O&M Guide
-
Product Consultation
-
Usage
- What Is DLI?
- What Are the Application Scenarios of DLI?
- Which Data Formats Does DLI Support?
- What Are the Differences Between DLI Flink and MRS Flink?
- What Are the Differences Between MRS Spark and DLI Spark?
- Where Can DLI Data Be Stored?
- What Are the Differences Between DLI Tables and OBS Tables?
- How Can I Use DLI If Data Is Not Uploaded to OBS?
- Can I Import OBS Bucket Data Shared by Other Tenants into DLI?
- Why Is Error "Failed to create the database. {"error_code":"DLI.1028";"error_msg":"Already reached the maximum quota of databases:XXX"." Reported?
- Can a Member Account Use Global Variables Created by Other Member Accounts?
- Job Management
- Privacy and Security
-
Usage
-
Billing
- What Is the Billing Mode of DLI?
- Can I Change a Yearly/Monthly Queue to a Pay-per-Use Queue?
- Can I Change a Pay-Per-Use Queue to a Yearly/Monthly Queue?
- When Is a Data Lake Queue Idle?
- How Do I Check the Billing?
- What Is the Difference Between the Following Two Payment Modes: One Is to Purchase 4,000-CU Resources for Three Months at a Time, the Other Is to Purchase 4,000-CU Resources for One Month for Three Times?
- How Do I Purchase 30-CU Resources?
- How Will I Be Billed if Only Two CUs Are Used to Run a Flink Job on a Queue of 16 CUs?
- Do I Need to Pay Extra Fees for Purchasing a Queue Billed Based on the Scanned Data Volume?
- Quota
-
Permission
- Usage
-
O&M Guide
- Why Does My Account Have Insufficient Permissions Due to Arrears?
- Why Does the System Display a Message Indicating Insufficient Permissions When I Update a Program Package?
- Why Is Error "DLI.0003: Permission denied for resource..." Reported When I Run a SQL Statement?
- Why Can't I Query Table Data After I've Been Granted Table Permissions?
- Will an Error Be Reported if the Inherited Permissions Are Regranted to a Table That Inherits Database Permissions?
- Why Can't I Query a View After I'm Granted the Select Table Permission on the View?
-
Queue
- Usage
-
O&M Guide
- How Do I View DLI Queue Load?
- How Do I Determine Whether There Are Too Many Jobs in the Current Queue?
- How Do I Switch an Earlier-Version Spark Queue to a General-Purpose Queue?
- Why Cannot I View the Resource Running Status of DLI Queues on Cloud Eye?
- How Do I Allocate Queue Resources for Running Spark Jobs If I Have Purchased 64 CUs?
- Why Is Error "Queue plans create failed. The plan xxx target cu is out of quota" Reported When I Schedule CU Changes?
- Why Is a Timeout Exception Reported When a DLI SQL Statement Fails to Be Executed on the Default Queue?
-
Datasource Connections
-
Datasource Connections
- Why Do I Need to Create a VPC Peering Connection for an Enhanced Datasource Connection?
- Failed to Bind a Queue to an Enhanced Datasource Connection
- DLI Failed to Connect to GaussDB(DWS) Through an Enhanced Datasource Connection
- How Do I Do if the Datasource Connection Is Created But the Network Connectivity Test Fails?
- How Do I Configure the Network Between a DLI Queue and a Data Source?
- What Can I Do If a Datasource Connection Is Stuck in Creating State When I Try to Bind a Queue to It?
- How Do I Bind a Datasource Connection to a Pay-per-Use Queue?
- How Do I Connect DLI to Data Sources?
- Cross-Source Analysis
-
Datasource Connection O&M
- Why Is the Error Message "communication link failure" Displayed When I Use a Newly Activated Datasource Connection?
- Connection Times Out During MRS HBase Datasource Connection, and No Error Is Recorded in Logs
- Why Can't I Find the Subnet When Creating a DLI Datasource Connection?
- Error Message "Incorrect string value" Is Displayed When insert overwrite Is Executed on a Datasource RDS Table
- Null Pointer Error Is Displayed When the System Creates a Datasource RDS Table
- Error Message "org.postgresql.util.PSQLException: ERROR: tuple concurrently updated" Is Displayed When the System Executes insert overwrite on a Datasource GaussDB(DWS) Table
- RegionTooBusyException Is Reported When Data Is Imported to a CloudTable HBase Table Through a Datasource Table
- A Null Value Is Written Into a Non-Null Field When a DLI Datasource Connection Is Used to Connect to a GaussDB(DWS) Table
- An Insert Operation Failed After the Schema of the GaussDB(DWS) Source Table Is Updated
-
Datasource Connections
-
APIs
- How Do I Obtain the AK/SK Pair?
- How Do I Obtain the Project ID?
- Why Is Error "unsupported media Type" Reported When I Subimt a SQL Job?
- Are Project IDs of Different Accounts the Same When They Are Used to Call APIs?
- What Can I Do If an Error Is Reported When the Execution of the API for Creating a SQL Job Times Out?
- What Can I Do If Chinese Characters Returned by an API Are Garbled Characters?
- SDKs
- Change History
-
Flink Jobs
- Videos
-
SQL Syntax Reference (To Be Offline)
- Notice on Taking This Syntax Reference Offline
-
Spark SQL Syntax Reference (Unavailable Soon)
- Common Configuration Items of Batch SQL Jobs
- SQL Syntax Overview of Batch Jobs
- Databases
- Creating an OBS Table
- Creating a DLI Table
- Deleting a Table
- Viewing Tables
- Modifying a Table
-
Syntax for Partitioning a Table
- Adding Partition Data (Only OBS Tables Supported)
- Renaming a Partition (Only OBS Tables Supported)
- Deleting a Partition
- Deleting Partitions by Specifying Filter Criteria (Only OBS Tables Supported)
- Altering the Partition Location of a Table (Only OBS Tables Supported)
- Updating Partitioned Table Data (Only OBS Tables Supported)
- Updating Table Metadata with REFRESH TABLE
- Importing Data to the Table
- Inserting Data
- Clearing Data
- Exporting Search Results
- Backing Up and Restoring Data of Multiple Versions
- Table Lifecycle Management
- Creating a Datasource Connection with an HBase Table
- Creating a Datasource Connection with an OpenTSDB Table
- Creating a Datasource Connection with a DWS table
- Creating a Datasource Connection with an RDS Table
- Creating a Datasource Connection with a CSS Table
- Creating a Datasource Connection with a DCS Table
- Creating a Datasource Connection with a DDS Table
- Creating a Datasource Connection with an Oracle Table
- Views
- Viewing the Execution Plan
- Data Permissions Management
- Data Types
- User-Defined Functions
-
Built-in Functions
-
Date Functions
- Overview
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
String Functions
- Overview
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- Mathematical Functions
- Aggregate Functions
- Window Functions
- Other Functions
-
Date Functions
- Basic SELECT Statements
- Filtering
- Sorting
- Grouping
- JOIN
- Subquery
- Alias
- Set Operations
- WITH...AS
- CASE...WHEN
- OVER Clause
- Flink OpenSource SQL 1.12 Syntax Reference
-
Flink Opensource SQL 1.10 Syntax Reference
- Constraints and Definitions
- Flink OpenSource SQL 1.10 Syntax
-
Data Definition Language (DDL)
- Creating a Source Table
-
Creating a Result Table
- ClickHouse Result Table
- Kafka Result Table
- Upsert Kafka Result Table
- DIS Result Table
- JDBC Result Table
- GaussDB(DWS) Result Table
- Redis Result Table
- SMN Result Table
- HBase Result Table
- Elasticsearch Result Table
- OpenTSDB Result Table
- User-defined Result Table
- Print Result Table
- File System Result Table
- Creating a Dimension Table
- Data Manipulation Language (DML)
- Functions
-
Historical Versions (Unavailable Soon)
-
Flink SQL Syntax
- SQL Syntax Constraints and Definitions
- SQL Syntax Overview of Stream Jobs
- Creating a Source Stream
-
Creating a Sink Stream
- CloudTable HBase Sink Stream
- CloudTable OpenTSDB Sink Stream
- MRS OpenTSDB Sink Stream
- CSS Elasticsearch Sink Stream
- DCS Sink Stream
- DDS Sink Stream
- DIS Sink Stream
- DMS Sink Stream
- DWS Sink Stream (JDBC Mode)
- DWS Sink Stream (OBS-based Dumping)
- MRS HBase Sink Stream
- MRS Kafka Sink Stream
- Open-Source Kafka Sink Stream
- File System Sink Stream (Recommended)
- OBS Sink Stream
- RDS Sink Stream
- SMN Sink Stream
- Creating a Temporary Stream
- Creating a Dimension Table
- Custom Stream Ecosystem
- Data Type
- Built-In Functions
- User-Defined Functions
- Geographical Functions
- SELECT
- Condition Expression
- Window
- JOIN Between Stream Data and Table Data
- Configuring Time Models
- Pattern Matching
- StreamingML
- Reserved Keywords
-
Flink SQL Syntax
-
Identifiers
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- input_expression
- join_condition
- non_equi_join_condition
- number
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- select_statement
- separator
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- when_expression
- where_condition
- window_function
- Operators
- Change History
Result Table
Function
The FileSystem result (sink) table is used to export data to the HDFS or OBS file system. It is applicable to scenarios such as data dumping, big data analysis, data backup, and active, deep, or cold archiving.
Considering that the input stream can be unbounded, you can put the data in each bucket into part files of a limited size. Data can be written into a bucket based on time. For example, you can write data into a bucket every hour. This bucket contains the records received within one hour, and
data in the bucket directory is split into multiple part files. Each sink bucket that receives data contains at least one part file for each subtask. Other part files are created based on the configured rolling policy. For Row Formats, the default rolling policy is based on the part file size. You need to specify the maximum timeout period for opening a file and the timeout period for the inactive state after closing a file. Bulk Formats are rolled each time a checkpoint is created. You can add other rolling conditions based on size or time. For more information, see FileSystem SQL Connector.
- To use FileSink in STREAMING mode, you need to enable the checkpoint function. Part files are generated only when the checkpoint is successful. If the checkpoint function is not enabled, the files remain in the in-progress or pending state, and downstream systems cannot securely read the file data.
- The number recorded by the sink end operator is the number of checkpoints, not the actual volume of the sent data. For the actual volume, see the number recorded by the streaming-writer or StreamingFileWriter operator.
Caveats
On the Flink job's editing page, select Enable Checkpointing on the Running Parameters tab. Otherwise, data cannot be written to the FileSystem result table.
Syntax
1 2 3 4 5 6 7 8 9 10 11 |
CREATE TABLE sink_table ( name string, num INT, p_day string, p_hour string ) partitioned by (p_day, p_hour) WITH ( 'connector' = 'filesystem', 'path' = 'obs://*** ', 'format' = 'parquet', 'auto-compaction' = 'true' ); |
Usage
- Rolling Policy
The Rolling Policy defines when a given in-progress part file will be closed and moved to the pending and later to finished state. Part files in the "finished" state are the ones that are ready for viewing and are guaranteed to contain valid data that will not be reverted in case of failure.
In STREAMING mode, the Rolling Policy in combination with the checkpointing interval (pending files become finished on the next checkpoint) control how quickly part files become available for downstream readers and also the size and number of these parts. For details, see Parameter Description.
- Part File Lifecycle
To use the output of the FileSink in downstream systems, we need to understand the naming and lifecycle of the output files produced.
Part files can be in one of three states:
- In-progress: The part file that is currently being written to is in-progress.
- Pending: Closed (due to the specified rolling policy) in-progress files that are waiting to be committed.
- Finished: On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to Finished
Only finished files are safe to read by downstream systems as those are guaranteed to not be modified later.
By default, the file naming strategy is as follows:
- In-progress / Pending: part-<uid>-<partFileIndex>.inprogress.uid
- Finished: part-<uid>-<partFileIndex>
uid is a random ID assigned to a subtask of the sink when the subtask is instantiated. This uid is not fault-tolerant so it is regenerated when the subtask recovers from a failure.
- Compaction
FileSink supports compaction of the pending files, which allows the application to have smaller checkpoint interval without generating a lot of small files.
Once enabled, the compaction happens between the files become pending and get committed. The pending files will be first committed to temporary files whose path starts with a dot (.). Then these files will be compacted according to the strategy by the compactor specified by the users, and the new compacted pending files will be generated. Then these pending files will be emitted to the committer to be committed to the formal files. After that, the source files will be removed.
- Partitions
Filesystem sink supports the partitioning function. Partitions are generated based on the selected fields by using the partitioned by syntax. The following is an example:
path └── datetime=2022-06-25 └── hour=10 ├── part-0.parquet ├── part-1.parquet └── datetime=2022-06-26 └── hour=16 ├── part-0.parquet └── hour=17 ├── part-0.parquet
Similar to files, partitions also need to be submitted to notify downstream applications that files in the partitions can be securely read. Filesystem sink provides multiple configuration submission policies.
Parameter Description
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
---|---|---|---|---|
connector |
Yes |
None |
String |
The value is fixed at filesystem. |
path |
Yes |
None |
String |
OBS path |
format |
Yes |
None |
String |
File format Available values are: csv and parquet |
sink.rolling-policy.file-size |
No |
128MB |
MemorySize |
Maximum size of a part file. If the size of a part file exceeds this value, a new file will be generated. The Rolling Policy defines when a given in-progress part file will be closed and moved to the pending and later to finished state. Part files in the "finished" state are the ones that are ready for viewing and are guaranteed to contain valid data that will not be reverted in case of failure. In STREAMING mode, the Rolling Policy in combination with the checkpointing interval (pending files become finished on the next checkpoint) control how quickly part files become available for downstream readers and also the size and number of these parts. |
sink.rolling-policy.rollover-interval |
No |
30 min |
Duration |
Maximum duration that a part file can be opened. If a part file is opened longer than the maximum duration, a new file will be generated in rolling mode. The default value is 30 minutes so that there will not be a large number of small files. The check frequency is specified by sink.rolling-policy.check-interval. There must be a space between the number and the unit. The supported time units include d, h, min, s, and ms. For bulk files (parquet, orc, and avro), the checkpoint interval also controls the maximum open duration of a part file. |
sink.rolling-policy.check-interval |
No |
1 min |
Duration |
Check interval of the time-based rolling policy This parameter controls the frequency of checking whether a file should be rolled based on sink.rolling-policy.rollover-interval. |
auto-compaction |
No |
false |
Boolean |
Whether automatic compaction is enabled for the streaming sink. Data is first written to temporary files. After the checkpoint is complete, the temporary files generated by the checkpoint are compacted. |
compaction.file-size |
No |
Size of sink.rolling-policy.file-size |
MemorySize |
Size of the files that will be compacted. The default value is the size of the files that will be rolled.
|
Example 1
create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min' ); INSERT into sink_table SELECT * from orders;
Example 2
create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'parquet', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min', 'auto-compaction'='true', 'compaction.file-size'='100m' ); INSERT into sink_table SELECT * from orders;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.