Updated on 2024-11-29 GMT+08:00

Stream Load

Stream load is a synchronous way of importing. Users import local files or data streams into Doris by sending HTTP protocol requests. Stream load synchronously executes the import and returns the import result. Users can directly determine whether the import is successful by the return body of the request.

Stream load is mainly suitable for importing local files or data from data streams through procedures. Data in CSV, Parquet, and ORC formats can be imported. Only data in CSV format is supported by default.

Syntax

  • Create a stream load import job.

    Stream load submits and transfers data through HTTP protocol. The following curl commands are used to show you how to submit an import. You can also operation through other HTTP clients.

    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      curl -k --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT https:// IP address of the Doris FE instance:HTTPS port number/api/{Database name}/{Table name}/_stream_load

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://IP address of the Doris FE instance:HTTP port number/api/{Database name}/{Table name}/_stream_load

    To view the IP address of the active Doris FE instance, log in to FusionInsight Manager and choose Cluster > Services > Doris > Instances.

    You can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port to view the HTTPS port.

    . To view the port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for http_port.

    Table 1 describes other parameters for creating a stream load task.

    Table 1 Parameters of a stream load task

    Parameter

    Description

    Signature parameter

    user:passwd

    Stream load uses the HTTP protocol to create the imported protocol and signs it through the Basic Access authentication. The Doris system verifies user identity and import permissions based on signatures.

    Load parameters (format: -H "key1:value1")

    label

    Identity of import task. Each import task has a unique label inside a single database. Label is a user-defined name in the import command. With this label, you can view the execution status of the corresponding import task.

    column_separator

    Column separator in the file to be imported. The default value is \t. You can use a combination of multiple characters as the column separator. If it is an invisible character, you need to add \x as a prefix and hexadecimal to indicate the separator. For example, the separator \x01 of the hive file needs to be specified as -H "column_separator:\x01".

    line_delimiter

    Line delimiter in the file to be imported. The default value is \n. You can use a combination of multiple characters as the line delimiter.

    max_filter_ratio

    Maximum tolerance rate of the import task. The default value is 0, and the range of values is 0-1. When the import error rate exceeds this value, the import fails.

    where

    Filter criteria specified for an import task. Stream Load allows you to specify where statements to filter raw data. The filtered data will not be imported or involved in the calculation of filter ratio, but will be counted as num_rows_unselected.

    Partitions

    Partition information of the table to be imported. If the data to be imported does not belong to the specified partition, the information will not be imported and will be included in dpp.abnorm.ALL.

    columns

    The function transformation configuration of data to be imported. The sequence change of columns and the expression transformation are included. The expression transformation method is consistent with the query statement.

    format

    Format of the data to be imported. The value can be CSV (default), JSON, Parquet, or ORC.

    exec_mem_limit

    Memory limit in byes. The default value is 2 GB.

    strict_mode

    The strict mode affects the import behavior of certain values and the final imported data. You can declare strict_mode=true in the header to enable the strict mode. By default, the strict mode is disabled.

    The strict mode is used to restrict the filtering of column type conversions during import.

    • For column type conversions, if strict mode is true, incorrect data will be filtered. Incorrect data here refers to the originally non-null data that is converted into nulls.
    • If a column to be imported is converted by a function, the strict mode does not affect the column.
    • For an imported column type that contains range restrictions, if the original data can pass the type conversion normally, but cannot pass the range restrictions, the strict mode will not affect it. For example, if the type is decimal(1,0) and the original data is 10, it belongs to the range that can be converted by type but is not within the scope of the column declaration. The strict mode has no effect on this kind of data.

    merge_type

    Data merging type. The options are APPEND, DELETE, and MERGE. The default value is APPEND. APPEND is the default value, which appends all this batch of data to the existing data. DELETE deletes all rows with the same key as this batch of data. MERGE semantics need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics.

    two_phase_commit

    Stream load import can enable two-stage transaction commit mode. In the stream load process, data is written and the information is returned to you. At this time, the data is invisible and the transaction status is PRECOMMITTED. After you commit the transaction, the data is visible.

    enable_profile

    If enable_profile is set to true, the stream load profile will be recorded in logs. Otherwise, it will not be recorded in logs.

  • Returned results

    Since stream load imports data synchronously, the result of the import is directly returned to the user by creating the return value of the import. The following is an example:

    {
    "TxnId": 1003,
    "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
    "Status": "Success",
    "ExistingJobStatus": "FINISHED", // optional
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
    }

    Table 2 describes the parameters in the import result.

    Table 2 Parameters in a stream load result

    Parameter

    Description

    TxnId

    The imported transaction ID.

    Label

    Import labels, which are specified by users or automatically generated by the system.

    Status

    Import completion status.

    • Success: indicates that the import is successful.
    • Publish Timeout: also indicates that the import is complete. The only difference is that the data may be delayed and visible without retrying.
    • Label Already Exists: indicates that a label is duplicate and needs to be replaced.
    • Fail: Import failed.

    ExistingJobStatus

    The state of the load job corresponding to the existing Label.

    This field is displayed only when Status is "Label Already Exists". You can know the status of the load job corresponding to Label through this state. "RUNNING" indicates that the job is still being executed, and "FINISHED" indicates that the job is successfully executed.

    Message

    Import error information.

    NumberTotalRows

    Number of rows imported for total processing.

    NumberLoadedRows

    Number of rows successfully imported.

    NumberFilteredRows

    Number of rows that do not qualify for data quality.

    NumberUnselectedRows

    Number of rows filtered by the where condition.

    LoadBytes

    Number of imported bytes.

    LoadTimeMs

    Import completion time, in milliseconds.

    BeginTxnTimeMs

    Time cost for requesting FE to begin a transaction. The unit is millisecond.

    StreamLoadPutTimeMs

    Time cost for requesting FE to obtain the data import plan. The unit is millisecond.

    ReadDataTimeMs

    Time cost for reading data, in milliseconds.

    WriteDataTimeMs

    Time cost for writing data, in milliseconds.

    CommitAndPublishTimeMs

    Time cost for submitting a request to FE and releasing a transaction, in milliseconds.

    ErrorURL

    URL to view a specific error line if there are data quality problems.

    Since Stream load is a synchronous import mode, import information will not be recorded in Doris system. You cannot see Stream load asynchronously by looking at import commands. You need to view the return value of the import request to obtain the import result.

  • Canceling Load

    You cannot manually cancel stream load tasks. Stream load will be automatically canceled by the system after a timeout or import error.

  • Viewing Stream Load

    You can view completed stream load tasks through show stream load.

    By default, the BE does not record stream load information. To view the information, you need to set the enable_stream_load_record=true parameter.

Prerequisite

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The nodes to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the new user dorisuser and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Installing a MySQL Client.

Stream Load Example

  1. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

    If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -uDatabase login username -pDatabase login password -PConnection port for FE queries -hIP address of the Doris FE instance

    • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
    • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
    • You can also use the MySQL connection software or Doris web UI to connect to the database.

  2. Run the following statement to create a database:

    create database if not exists example_db;

  3. Run the following statement to create a table:

    CREATE TABLE example_db.test_stream_tbl (

    `c1` int NOT NULL,

    `c2` int NOT NULL,

    `c3` string NOT NULL,

    `c4` date NOT NULL

    ) ENGINE=OLAP

    UNIQUE KEY(`c1`, `c2`)

    DISTRIBUTED BY HASH(`c1`) BUCKETS 1;

  4. Create the data.csv file and add the following content to the file:

    1,1,1,2020-02-21
    2,2,2,2020-03-21
    3,3,3,2020-04-21

  5. Use stream load to import data.csv file data to the table created in 3.

    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      curl -k --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv https:// IP address of the Doris FE instance:HTTPS port /api/example_db/test_stream_tbl/_stream_load

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      curl --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv http:// IP address of the Doris FE instance:HTTP port /api/example_db/test_stream_tbl/_stream_load

  6. Run the following command to view the table data:

    select * from example_db.test_stream_tbl;

Parameter Configurations

Log in to FusionInsight Manager, choose Cluster > Services > Doris, and click Configurations then All Configurations.

  • Choose FE(Role) > Customization, and add the following parameters to fe.conf.customized.configs:

    stream_load_default_timeout_second: timeout interval of a stream load task, in seconds. If a load task is not complete within the specified time, the system cancels the task and the task status changes to CANCELLED. The default value is 600 seconds. If source files cannot be imported within this period, you can set another timeout period in the stream load request or change the value of stream_load_default_timeout_second to make the global timeout a longer period.

  • Choose BE(Role) > Customization and add the following parameters to be.conf.customized.configs:

    streaming_load_max_mb: maximum file size (in MB) allowed for the stream load task. The default size is 10 GB. If a source file exceeds the maximum size, you need to adjust the value of this parameter.