Stream Load
Stream load is a synchronous way of importing. Users import local files or data streams into Doris by sending HTTP protocol requests. Stream load synchronously executes the import and returns the import result. Users can directly determine whether the import is successful by the return body of the request.
Stream load is mainly suitable for importing local files or data from data streams through procedures. Data in CSV, Parquet, and ORC formats can be imported. Only data in CSV format is supported by default.
Syntax
- Create a stream load import job.
Stream load submits and transfers data through HTTP protocol. The following curl commands are used to show you how to submit an import. You can also operation through other HTTP clients.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
curl -k --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT https:// IP address of the Doris FE instance:HTTPS port number/api/{Database name}/{Table name}/_stream_load
- Kerberos authentication is disabled for the cluster (the cluster is in normal mode)
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://IP address of the Doris FE instance:HTTP port number/api/{Database name}/{Table name}/_stream_load
To view the IP address of the active Doris FE instance, log in to FusionInsight Manager and choose Cluster > Services > Doris > Instances.
You can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port to view the HTTPS port.
. To view the port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for http_port.
Table 1 describes other parameters for creating a stream load task.
Table 1 Parameters of a stream load task Parameter
Description
Signature parameter
user:passwd
Stream load uses the HTTP protocol to create the imported protocol and signs it through the Basic Access authentication. The Doris system verifies user identity and import permissions based on signatures.
Load parameters (format: -H "key1:value1")
label
Identity of import task. Each import task has a unique label inside a single database. Label is a user-defined name in the import command. With this label, you can view the execution status of the corresponding import task.
column_separator
Column separator in the file to be imported. The default value is \t. You can use a combination of multiple characters as the column separator. If it is an invisible character, you need to add \x as a prefix and hexadecimal to indicate the separator. For example, the separator \x01 of the hive file needs to be specified as -H "column_separator:\x01".
line_delimiter
Line delimiter in the file to be imported. The default value is \n. You can use a combination of multiple characters as the line delimiter.
max_filter_ratio
Maximum tolerance rate of the import task. The default value is 0, and the range of values is 0-1. When the import error rate exceeds this value, the import fails.
where
Filter criteria specified for an import task. Stream Load allows you to specify where statements to filter raw data. The filtered data will not be imported or involved in the calculation of filter ratio, but will be counted as num_rows_unselected.
Partitions
Partition information of the table to be imported. If the data to be imported does not belong to the specified partition, the information will not be imported and will be included in dpp.abnorm.ALL.
columns
The function transformation configuration of data to be imported. The sequence change of columns and the expression transformation are included. The expression transformation method is consistent with the query statement.
format
Format of the data to be imported. The value can be CSV (default), JSON, Parquet, or ORC.
exec_mem_limit
Memory limit in byes. The default value is 2 GB.
strict_mode
The strict mode affects the import behavior of certain values and the final imported data. You can declare strict_mode=true in the header to enable the strict mode. By default, the strict mode is disabled.
The strict mode is used to restrict the filtering of column type conversions during import.
- For column type conversions, if strict mode is true, incorrect data will be filtered. Incorrect data here refers to the originally non-null data that is converted into nulls.
- If a column to be imported is converted by a function, the strict mode does not affect the column.
- For an imported column type that contains range restrictions, if the original data can pass the type conversion normally, but cannot pass the range restrictions, the strict mode will not affect it. For example, if the type is decimal(1,0) and the original data is 10, it belongs to the range that can be converted by type but is not within the scope of the column declaration. The strict mode has no effect on this kind of data.
merge_type
Data merging type. The options are APPEND, DELETE, and MERGE. The default value is APPEND. APPEND is the default value, which appends all this batch of data to the existing data. DELETE deletes all rows with the same key as this batch of data. MERGE semantics need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics.
two_phase_commit
Stream load import can enable two-stage transaction commit mode. In the stream load process, data is written and the information is returned to you. At this time, the data is invisible and the transaction status is PRECOMMITTED. After you commit the transaction, the data is visible.
enable_profile
If enable_profile is set to true, the stream load profile will be recorded in logs. Otherwise, it will not be recorded in logs.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
- Returned results
Since stream load imports data synchronously, the result of the import is directly returned to the user by creating the return value of the import. The following is an example:
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
Table 2 describes the parameters in the import result.
Table 2 Parameters in a stream load result Parameter
Description
TxnId
The imported transaction ID.
Label
Import labels, which are specified by users or automatically generated by the system.
Status
Import completion status.
- Success: indicates that the import is successful.
- Publish Timeout: also indicates that the import is complete. The only difference is that the data may be delayed and visible without retrying.
- Label Already Exists: indicates that a label is duplicate and needs to be replaced.
- Fail: Import failed.
ExistingJobStatus
The state of the load job corresponding to the existing Label.
This field is displayed only when Status is "Label Already Exists". You can know the status of the load job corresponding to Label through this state. "RUNNING" indicates that the job is still being executed, and "FINISHED" indicates that the job is successfully executed.
Message
Import error information.
NumberTotalRows
Number of rows imported for total processing.
NumberLoadedRows
Number of rows successfully imported.
NumberFilteredRows
Number of rows that do not qualify for data quality.
NumberUnselectedRows
Number of rows filtered by the where condition.
LoadBytes
Number of imported bytes.
LoadTimeMs
Import completion time, in milliseconds.
BeginTxnTimeMs
Time cost for requesting FE to begin a transaction. The unit is millisecond.
StreamLoadPutTimeMs
Time cost for requesting FE to obtain the data import plan. The unit is millisecond.
ReadDataTimeMs
Time cost for reading data, in milliseconds.
WriteDataTimeMs
Time cost for writing data, in milliseconds.
CommitAndPublishTimeMs
Time cost for submitting a request to FE and releasing a transaction, in milliseconds.
ErrorURL
URL to view a specific error line if there are data quality problems.
Since Stream load is a synchronous import mode, import information will not be recorded in Doris system. You cannot see Stream load asynchronously by looking at import commands. You need to view the return value of the import request to obtain the import result.
- Canceling Load
You cannot manually cancel stream load tasks. Stream load will be automatically canceled by the system after a timeout or import error.
- Viewing Stream Load
You can view completed stream load tasks through show stream load.
By default, the BE does not record stream load information. To view the information, you need to set the enable_stream_load_record=true parameter.
Prerequisite
- A cluster containing the Doris service has been created, and all services in the cluster are running properly.
- The nodes to be connected to the Doris database can communicate with the MRS cluster.
- A user with Doris management permission has been created.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.
Log in to FusionInsight Manager as the new user dorisuser and change the initial password.
- Kerberos authentication is disabled for the cluster (the cluster is in normal mode)
After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
- The MySQL client has been installed. For details, see Installing a MySQL Client.
Stream Load Example
- Log in to the node where MySQL is installed and run the following command to connect to the Doris database:
If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -uDatabase login username -pDatabase login password -PConnection port for FE queries -hIP address of the Doris FE instance
- To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
- To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
- You can also use the MySQL connection software or Doris web UI to connect to the database.
- Run the following statement to create a database:
create database if not exists example_db;
- Run the following statement to create a table:
CREATE TABLE example_db.test_stream_tbl (
`c1` int NOT NULL,
`c2` int NOT NULL,
`c3` string NOT NULL,
`c4` date NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`c1`, `c2`)
DISTRIBUTED BY HASH(`c1`) BUCKETS 1;
- Create the data.csv file and add the following content to the file:
1,1,1,2020-02-21 2,2,2,2020-03-21 3,3,3,2020-04-21
- Use stream load to import data.csv file data to the table created in 3.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
curl -k --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv https:// IP address of the Doris FE instance:HTTPS port /api/example_db/test_stream_tbl/_stream_load
- Kerberos authentication is disabled for the cluster (the cluster is in normal mode)
curl --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv http:// IP address of the Doris FE instance:HTTP port /api/example_db/test_stream_tbl/_stream_load
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
- Run the following command to view the table data:
select * from example_db.test_stream_tbl;
Parameter Configurations
Log in to FusionInsight Manager, choose Cluster > Services > Doris, and click Configurations then All Configurations.
- Choose FE(Role) > Customization, and add the following parameters to fe.conf.customized.configs:
stream_load_default_timeout_second: timeout interval of a stream load task, in seconds. If a load task is not complete within the specified time, the system cancels the task and the task status changes to CANCELLED. The default value is 600 seconds. If source files cannot be imported within this period, you can set another timeout period in the stream load request or change the value of stream_load_default_timeout_second to make the global timeout a longer period.
- Choose BE(Role) > Customization and add the following parameters to be.conf.customized.configs:
streaming_load_max_mb: maximum file size (in MB) allowed for the stream load task. The default size is 10 GB. If a source file exceeds the maximum size, you need to adjust the value of this parameter.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot