Importing Data to a Doris Cluster with Stream Load
Stream Load is a synchronous import method. You can import local files or data streams into Doris by sending HTTP requests. Stream Load synchronously executes the import and returns the import result. You can determine whether the import is successful based on the response body.
Stream Load is used to import local files or import data in data streams through programs.
Basic Principles
The following shows the main flow of Stream Load, omitting some import details.
^ + | | | | 1A. User submit load to FE | | | +--v-----------+ | | FE | 5. Return result to user | +--+-----------+ | | | | 2. Redirect to BE | | | +--v-----------+ +---+Coordinator BE| 1B. User submit load to BE +-+-----+----+-+ | | | +-----+ | +-----+ | | | 3. Distribute data | | | +-v-+ +-v-+ +-v-+ |BE | |BE | |BE | +---+ +---+ +---+
In Stream Load, Doris selects a node as the Coordinator node. This node receives data and distributes data to other data nodes. You can submit the import command through HTTP. If the command is submitted to the FE node, the FE node forwards the request to a BE node through the HTTP redirect command. You can also directly submit the import command to a specified BE node. The Coordinator BE node returns the final import result.
Basic Operations

Before using Stream Load to import data, ensure that the security group ports of the Doris cluster (ports 8030 and 8040) are enabled. Otherwise, the connection to Stream Load will time out.
- Creating an import job
Stream Load submits and transfers data through HTTP. Here, the curl command shows how to submit an import.
You can also perform operations through other HTTP clients.
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

- The properties supported in the header are described in the import parameters below.
- The format is -H "Key 1:Value 1".
- port specifies the HTTP port number.
You can run HELP STREAM LOAD to view the detailed syntax for creating an import job. All parameters related to Stream Load import jobs are set in the header. The following table describes related parameters.
Parameter |
Description |
|
---|---|---|
Signature parameter |
user/passwd |
Stream Load uses the HTTP protocol to create the import job and signs it through Basic Access authentication. Doris verifies user identity and import permissions based on the signature. |
Import parameters |
label |
A label identifies an import job. Each import job has a unique label in a single database. A label is a user-defined name in the import command. With this label, you can view the execution status of the import job. Another function of labels is to prevent repeated data import. It is strongly recommended that you use the same label for the same batch of data. In this way, repeated requests for the same batch of data will only be accepted once, guaranteeing At-Most-Once. When the corresponding import operation state of a label is CANCELLED, the label can be used again. |
column_separator |
This parameter specifies the column separator in the imported file. \t is used by default. If it is an invisible character, you need to prefix \x and use its hexadecimal notation to represent the separator. For example, the separator \x01 of the Hive file needs to be specified as -H "column_separator:\x01". You can use a combination of multiple characters as the column separator. |
|
line_delimiter |
This parameter specifies the line delimiter in the imported file. \n is used by default. You can use a combination of multiple characters as the line delimiter. |
|
max_filter_ratio |
This parameter specifies the maximum tolerance rate of the import job. The default value is 0, and the range is 0 to 1. When the import error rate exceeds this value, the import fails. If you want to ignore error rows, set this parameter to a value greater than 0 to ensure a successful import. The calculation formula is as follows: (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio dpp.abnorm.ALL indicates the number of rows with unqualified data quality. For example, the type, number of columns, or length does not match. dpp.norm.ALL indicates the number of correct data records during the import. You can run the SHOW LOAD command to query the correct data volume of the import job. Number of rows in the source file = dpp.abnorm.ALL + dpp.norm.ALL |
|
where |
This parameter specifies the filter criteria for an import job. Stream Load supports filtering of raw data by specifying the where clause. The filtered data will not be imported or calculated into the filter ratio, but will be counted into num_rows_unselected. |
|
Partitions |
This parameter specifies the partition information of the table to be imported. If the data to be imported does not belong to the specified partition, the information will not be imported and will be included in dpp.abnorm.ALL. |
|
columns |
The function transformation configuration of data to be imported includes the sequence change of columns and the expression transformation. The expression transformation method is the same as that of the query statement. |
|
exec_mem_limit |
This parameter limits the import memory. The default value is 2 GB, in bytes. |
|
strict_mode |
The strict mode can be enabled for Stream Load import. To enable the strict mode, specify strict_mode to true in the header. By default, the strict mode is disabled. In strict mode, column type conversion during data import is strictly filtered. The strict filtering policy is as follows: For column type conversion, if the strict mode is enabled, incorrect data will be filtered out. Incorrect data here refers to the data that is originally non-null but is converted into null. If a column to be imported is converted by a function, the strict mode does not affect the column. If the type of a column to be imported contains a range restriction and the raw data can be converted but cannot pass the range restriction, the strict mode does not affect the column. For example, if the type is decimal(1,0) and the raw data is 10, the data can be converted but is not within the range specified by the column. The strict mode does not affect the data. |
|
merge_type |
Data merge supports three types, APPEND, DELETE, and MERGE. APPEND is the default type, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key in this batch of data. MERGE semantics need to be used in conjunction with the DELETE condition, which means that the data that meets the DELETE condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics. |
|
two_phase_commit |
Stream Load import supports the two-phase transaction commit mode. In the Stream Load process, data is written and a message is returned. At this time, the data is invisible and the transaction status is PRECOMMITTED. After you manually trigger the commit operation, the data becomes visible. Example:
|
- Example 1: Import data in CSV format.
- Create a Doris table.
CREATE TABLE cloudtable0327.doris_streameload_test01 ( user_id bigint, date date, group_id bigint, modify_date date, keyword VARCHAR(128) ) UNIQUE KEY(user_id, date, group_id) DISTRIBUTED BY HASH (user_id) BUCKETS 32 PROPERTIES( 'function_column.sequence_col' = 'modify_date', 'replication_num' = '3', 'in_memory' = 'false' );
- Prepare the data table sequencedata01.csv.
Table 2 sequencedata01.csv 1
2020-02-22
1
2020-02-21
a
1
2020-02-22
1
2020-02-22
b
1
2020-02-22
1
2020-03-05
c
1
2020-02-22
1
2020-02-26
d
1
2020-02-22
1
2020-02-23
e
1
2020-02-22
1
2020-02-24
b
- Run the curl command to load data.
curl -k --location-trusted -u admin:passwd -T sequencedata01.csv -H 'column_separator:,' https://{fe_host}:{http_port}/api/cloudtable0327/doris_streameload_test01/_stream_load
- View the returned result.
Since Stream Load imports data synchronously, the result of the import is directly returned to the user through the return value of the import.
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://fe_host:http_port/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
- The following table describes the parameters of the Stream Load import result.
Table 3 Parameters Parameter
Description
TxnId
Import transaction ID, which is invisible to users.
Label
Import label, which is user-defined or automatically generated by the system.
Status
Import completion status.
- Success: indicates that the import is successful.
- Publish Timeout: indicates that the import has been completed but data visibility may be delayed. You do not need to retry.
- Label Already Exists: indicates that the label is duplicate and needs to be replaced.
- Fail: indicates that the import fails.
ExistingJobStatus
Status of the import job corresponding to the existing label.
This field is displayed only when the status is Label Already Exists. You can view the status of the import job corresponding to the existing label. RUNNING indicates that the job is still being executed, and FINISHED indicates that the job is successful.
Message
Import error information.
NumberTotalRows
Total number of rows processed during the import.
NumberLoadedRows
Number of rows successfully imported.
NumberFilteredRows
Number of rows whose data quality is unqualified.
NumberUnselectedRows
Number of rows filtered by the where condition.
LoadBytes
Number of imported bytes.
LoadTimeMs
Time when the import is complete, in milliseconds.
BeginTxnTimeMs
Time taken to request the FE node to start a transaction, in milliseconds.
StreamLoadPutTimeMs
Time taken to request the FE node to obtain the execution plan for importing data, in milliseconds.
ReadDataTimeMs
Time taken to read data, in milliseconds.
WriteDataTimeMs
Time taken to write data, in milliseconds.
CommitAndPublishTimeMs
Time taken to submit and publish a transaction to the FE node, in milliseconds.
ErrorURL
URL to view a specific error line if there are data quality problems.
Since Stream Load is a synchronous import mode, import information will not be recorded in Doris. You cannot see Stream Load asynchronously by checking import commands. You need to view the return value of the import request to obtain the import result.
- The following table describes the parameters of the Stream Load import result.
- Create a Doris table.
- Example 2: Import data in JSON format.
Prepare data in JSON format and save the data in the testjson.json file. Upload the JSON data to the Doris client.
{"id": 100, "city": "B", "code" : 1}
- Create a Doris table.
CREATE TABLE `doris_testjson01` ( `id` varchar(32) NOT NULL, `city` ARRAY<int(11)>, `code` int(11) ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( 'replication_allocation' = 'tag.location.default: 3', 'in_memory' = 'false', 'storage_format' = 'V2' );
- Run the curl command to load data.
curl --location-trusted -u admin:{Doris cluster password} -H 'format: json' -T testjson.json https://fe_host:http_port/api/{Doris database}/doris_testjson01/_stream_load -k
- Query data.
select * from doris_testjson01;
- Create a Doris table.
Canceling Data Import
You cannot manually cancel Stream Load jobs. It will be automatically canceled by the system upon a timeout or import error.
Viewing Stream Load Jobs
You can run the show stream load command to view completed Stream Load jobs.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot