Updated on 2024-05-29 GMT+08:00

Stream Load

Stream Load is a synchronous import mode. Users send requests through HTTP to import local files or data streams to the Doris. Stream Load synchronously executes the import and returns the import result. You can determine whether the import is successful based on the return body of the request.

Stream Load is used to import local files or import data in data flows through programs. Data in CSV, Parquet, and ORC formats can be imported. Only data in CSV format is supported by default.

Syntax

  • Creating a Stream Load Import Task

    Stream load submits and transfers data through HTTP protocol. This operation uses the curl command to demonstrate how to submit the import. You can also use other HTTP clients to perform this operation.

    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      curl -k --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT https:// Doris FE instance IP address:HTTPS port number/api/{Database name}/{Table name}/_stream_load

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http:// Doris FE instance IP address:HTTP port number/api/{Database name}/{Table name}/_stream_load

    To view the IP address of the Doris FE instance, log in to FusionInsight Manager and choose Cluster > Services > Doris > Instances.

    You can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port to view the HTTPS port.

    . To view the port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for http_port.

    Table 1 describes other parameters for creating a stream load task.

    Table 1 Parameters of the Stream Load Task

    Parameter

    Description

    Signature parameters

    user:passwd

    The HTTP protocol is used for stream load creation and import, and the basic access authentication is used for signature. The Doris system verifies user identity and import permissions based on signatures.

    Import task parameters (format: -H "key1:value1")

    label

    Identity of import task. Each import task has a unique label inside a single database. Label is a user-defined name in the import command. Through this label, you can view the execution status of the corresponding import task.

    column_separator

    Specifies the column separator in the file to be imported. The default value is \t. You can use a combination of multiple characters as the column separator. If it is an invisible character, you need to add \x as a prefix and hexadecimal to indicate the separator. For example, the separator \x01 of the hive file needs to be specified as -H "column_separator:\x01".

    line_delimiter

    Specifies the newline character in the file to be imported. The default value is \n. You can use to combine multiple characters as the newline character.

    max_filter_ratio

    The maximum tolerance rate of the import task is 0 by default, and the range of values is 0-1. When the import error rate exceeds this value, the import fails.

    where

    Filter criteria specified for an import task. Stream Load allows you to specify where statements to filter raw data. The filtered data will not be imported or involved in filter ratio calculation, but will be counted in num_rows_unselected.

    Partitions

    Partition information of the table to be imported. If the data to be imported does not belong to the specified partition, the data will not be imported and will be counted in dpp.abnorm.ALL.

    columns

    Function transformation configuration of the data to be imported. Currently, the function transformation methods supported by Stream Load include column sequence transformation and expression transformation. The expression transformation method is the same as that of the query statement.

    format

    Format of the data to be imported. The value can be CSV, JSON, Parquet, or ORC. The default value is CSV.

    exec_mem_limit

    Memory limit for data import. The default value is 2 GB. The unit is byte.

    strict_mode

    The strict mode can be enabled for stream load import. You can declare strict_mode=true in the header to enable the strict mode. By default, the strict mode is disabled.

    The strict mode is used to strictly filter column type conversion during data import. The policy is as follows:

    • For column type conversion, if strict mode is true, incorrect data will be filtered. Error data refers to the data whose original data is not null and whose result is null after column type conversion.
    • If a column to be imported is converted by a function, the strict mode does not affect the column.
    • If the type of a column to be imported contains a range restriction and the original data can be converted but cannot pass the range restriction, the strict mode does not affect the column. For example, if the type is decimal (1,0) and the original data is 10, the data can be converted but is not within the range specified by the column. The strict data does not affect the data.

    merge_type

    Data combination type. The options are APPEND, DELETE, and MERGE. The default value is APPEND. APPEND indicates that the batch of data needs to be added to the existing data. DELETE indicates that all rows that have the same key as the batch of data are deleted. MERGE semantics must be used together with the delete condition. Data that meets the delete condition is processed based on DELETE semantics, and other data is processed based on APPEND semantics.

    two_phase_commit

    The two-phase transaction commit mode can be enabled for stream load import. During stream loading, a message is returned to you after data is written. In this case, the data is invisible and the transaction status is PRECOMMITTED. The data is visible only after you manually trigger the commit operation.

    enable_profile

    If enable_profile is set to true, the stream load profile is recorded in logs. Otherwise, the stream load profile is not recorded in logs.

  • Returned Result

    Stream Load is a synchronous import mode. Therefore, the import result is directly returned through the return value of the import creation. For example:

    {
    "TxnId": 1003,
    "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
    "Status": "Success",
    "ExistingJobStatus": "FINISHED", // optional
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
    }

    Table 2 describes the parameters in the import result.

    Table 2 Parameters in the Stream Load Import Task Result

    Parameter

    Description

    TxnId

    The imported transaction ID.

    Label

    Import labels, which are specified by users or automatically generated by the system.

    Status

    Import completion status. The options are as follows:

    • Success: indicates that the import is successful.
    • Publish Timeout: This status also indicates that the import is complete. The only difference is that the data may be visible after a delay and does not need to be retried.
    • Label Already Exists: The label already exists and needs to be replaced.
    • Fail: Import failed.

    ExistingJobStatus

    Status of the import job corresponding to an existing label.

    This field is displayed only when Status is "Label Already Exists". You can view the status of the import job corresponding to the existing label. "RUNNING" indicates that the job is still being executed, and "FINISHED" indicates that the job is successfully executed.

    Message

    Import error information.

    NumberTotalRows

    Number of rows imported for total processing.

    NumberLoadedRows

    Number of rows successfully imported.

    NumberFilteredRows

    Number of rows in which data does not meet requirements.

    NumberUnselectedRows

    Number of rows filtered by the where condition.

    LoadBytes

    Number of imported bytes

    LoadTimeMs

    Import completion time, in milliseconds.

    BeginTxnTimeMs

    This parameter specifies the time taken to request the FE to start a transaction. The unit is millisecond.

    StreamLoadPutTimeMs

    Time taken to obtain the data import plan from the FE. The unit is millisecond.

    ReadDataTimeMs

    Time taken to read data, in milliseconds.

    WriteDataTimeMs

    Time taken to write data, in milliseconds.

    CommitAndPublishTimeMs

    Time taken to submit a request to the FE and release a transaction, in milliseconds.

    ErrorURL

    If a data error occurs, access the URL to view the error line.

    Stream Load is imported in synchronous mode. Therefore, the import information is not recorded in the Doris system. Users cannot view Stream Load asynchronously by running the import command. You need to view the return value of the import request to obtain the import result.

  • Cancel Import

    You cannot manually cancel stream load tasks. Stream load will be automatically canceled by the system after a timeout or import error.

  • Viewing Stream Load

    You can view completed stream load tasks on the show stream load.

    By default, the BE does not record the stream load import information. To view the information, you need to set the enable_stream_load_record=true parameter.

Prerequisite

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The node to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the created dorisuser user, and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Installing a MySQL Client.

Stream Load Task Example

  1. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

    If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -uDatabase login user -pDatabase login user password -PDatabase connection port -hDoris FE instance IP address

    • The database connection port is the query connection port of the Doris FE. You can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
    • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
    • You can also use the MySQL connection software or Doris WebUI to connect to the database.

  2. Run the following command to create a database:

    create database if not exists example_db;

  3. Run the following statement to create a table:

    CREATE TABLE example_db.test_stream_tbl (

    `c1` int NOT NULL,

    `c2` int NOT NULL,

    `c3` string NOT NULL,

    `c4` date NOT NULL

    ) ENGINE=OLAP

    UNIQUE KEY(`c1`, `c2`)

    DISTRIBUTED BY HASH(`c1`) BUCKETS 1;

  4. Create the data.csv file. The file content is as follows:

    1,1,1,2020-02-21
    2,2,2,2020-03-21
    3,3,3,2020-04-21

  5. Use Stream Load to import data in the data.csv file to the table created in 3.

    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      curl -k --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv https:// IP address of the Doris FE instance:HTTPS port /api/example_db/test_stream_tbl/_stream_load

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      curl --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv http:// IP address of the Doris FE instance:HTTP port /api/example_db/test_stream_tbl/_stream_load

  6. Run the following command to view the table data:

    select * from example_db.test_stream_tbl;

Related Parameter Configurations

Log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations > All Configurations, and add the following parameters:

  • Choose FE(Role) > Customization, and add the following parameters to the customized parameter fe.conf.customized.configs:

    stream_load_default_timeout_second: timeout interval of an import task, in seconds. If an import task is not complete within the specified time, the system cancels the task and the task status changes to CANCELLED. The default timeout period is 600 seconds. If the source file to be imported cannot be imported within the specified period, you can set a separate timeout period in the Stream Load request or change the value of stream_load_default_timeout_second to set the global default timeout period.

  • Choose BE(Role) > Customization and add the following parameters to the custom parameter be.conf.customized.configs:

    streaming_load_max_mb: maximum size of a file to be imported for Stream Load. The default value is 10 GB, in MB. If the size of the original file exceeds the value of this parameter, you need to adjust the value of this parameter.