文档首页/ 表格存储服务 CloudTable/ 用户指南/ 使用Doris/ 导入数据至Doris集群/ 使用Stream Load方式导入数据至Doris集群
更新时间:2025-07-24 GMT+08:00
分享

使用Stream Load方式导入数据至Doris集群

Stream load是一个同步的导入方式,用户通过发送HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream load同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

Stream load主要适用于导入本地文件,或通过程序导入数据流中的数据。

基本原理

下图展示了Stream load的主要流程,省略了一些导入细节。

  ^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distribute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

Stream load中,Doris会选定一个节点作为Coordinator节点。该节点负责接数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。用户也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给用户。

基本操作

在执行数据导入Stream Load操作之前,必须确保Doris集群的安全组端口开放,即8030和8040端口,否则Stream Load操作将会连接超时。

  • 创建导入

    Stream Load通过HTTP协议提交和传输数据。这里通过curl命令展示如何提交导入。

    用户也可以通过其他HTTP client进行操作。

    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
  • Header中支持属性见下面的‘导入任务参数’说明。
  • 格式为:-H "key1:value1"。
  • port:HTTP的端口。

创建导入任务的详细语法可以通过HELP STREAM LOAD命令查看。Stream Load中所有与导入任务相关的参数均设置在Header中。相关参数描述如下表所示。

表1 参数说明

参数

说明

签名参数

user/passwd

Stream load由于创建导入的协议使用的是HTTP协议,通过Basic access authentication进行签名。Doris系统会根据签名验证用户身份和导入权限。

导入任务参数

label

导入任务的标识。每个导入任务,都有一个在单database内部唯一的label。label是用户在导入命令中自定义的名称。通过这个label,用户可以查看对应导入任务的执行情况。

label的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。当label对应的导入作业状态为CANCELLED时,该label可以再次被使用。

column_separator

用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。

如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。

可以使用多个字符的组合作为列分隔符。

line_delimiter

用于指定导入文件中的换行符,默认为\n。

可以使用做多个字符的组合作为换行符。

max_filter_ratio

导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。

如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。

计算公式为:

(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio

dpp.abnorm.ALL表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。

dpp.norm.ALL指的是导入过程中正确数据的条数。可以通过SHOW LOAD命令查询导入任务的正确数据量。

原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL。

where

导入任务指定的过滤条件。Stream load支持对原始数据指定where语句进行过滤。被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。

Partitions

待导入表的Partition信息,如果待导入数据不属于指定的Partition则不会被导入。这些数据将计入dpp.abnorm.ALL。

columns

待导入数据的函数变换配置,目前Stream load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

exec_mem_limit

导入内存限制。默认为2GB,单位为字节。

strict_mode

Stream Load导入可以开启strict mode模式。开启方式为在HEADER中声明strict_mode=true 。默认的strict mode为关闭。

strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

对于列类型转换来说,如果strict mode为true,则错误的数据将被filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。对于导入的某列由函数变换生成时,strict mode对其不产生影响。对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode对其也不产生影响。例如:如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据strict对其不产生影响。

merge_type

数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,Delete表示删除与这批数据Key相同的所有行,MERGE语义需要与Delete条件联合使用,表示满足Delete条件的数据按照Delete语义处理其余的按照APPEND语义处理。

two_phase_commit

Stream load导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。

示例:

  • 发起Stream load预提交操作。
    curl  --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
    {
        "TxnId": 18036,
        "Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
        "TwoPhaseCommit": "true",
        "Status": "Success",
        "Message": "OK",
        "NumberTotalRows": 100,
        "NumberLoadedRows": 100,
        "NumberFilteredRows": 0,
        "NumberUnselectedRows": 0,
        "LoadBytes": 1031,
        "LoadTimeMs": 77,
        "BeginTxnTimeMs": 1,
        "StreamLoadPutTimeMs": 1,
        "ReadDataTimeMs": 0,
        "WriteDataTimeMs": 58,
        "CommitAndPublishTimeMs": 0
    }
  • 对事务触发commit操作:
    • 注意1:请求发往fe或be均可。
    • 注意2:commit的时候可以省略url中的{table}。
      curl -X PUT --location-trusted -u user:passwd  -H "txn_id:18036" -H "txn_operation:commit"  http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc
      {
          "status": "Success",
          "msg": "transaction [18036] commit successfully."
      }
  • 对事务触发abort操作:
    • 注意1:请求发往FE或BE均可。
    • 注意2:abort的时候可以省略URL中的 {table}。
      curl -X PUT --location-trusted -u user:passwd  -H "txn_id:18037" -H "txn_operation:abort"  http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc
      {
          "status": "Success",
          "msg": "transaction [18037] abort successfully."
      }
  • 示例1,CSV数据格式导入。
    • 创建Doris表
      CREATE TABLE cloudtable0327.doris_streameload_test01
      (
      user_id bigint,
      date date,
      group_id bigint,
      modify_date date,
      keyword VARCHAR(128)
      )
      UNIQUE KEY(user_id, date, group_id)
      DISTRIBUTED BY HASH (user_id) BUCKETS 32
      PROPERTIES(
      'function_column.sequence_col' = 'modify_date',
      'replication_num' = '3',
      'in_memory' = 'false'
      );
    • 准备数据表sequencedata01.csv。
      表2 sequencedata01.csv

      1

      2020-02-22

      1

      2020-02-21

      a

      1

      2020-02-22

      1

      2020-02-22

      b

      1

      2020-02-22

      1

      2020-03-05

      c

      1

      2020-02-22

      1

      2020-02-26

      d

      1

      2020-02-22

      1

      2020-02-23

      e

      1

      2020-02-22

      1

      2020-02-24

      b

    • 执行curl命令load数据。
      curl -k --location-trusted -u admin:passwd -T sequencedata01.csv -H 'column_separator:,' https://{fe_host}:{http_port}/api/cloudtable0327/doris_streameload_test01/_stream_load
    • 返回结果。

      由于Stream load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

      {
          "TxnId": 1003,
          "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
          "Status": "Success",
          "ExistingJobStatus": "FINISHED", // optional
          "Message": "OK",
          "NumberTotalRows": 1000000,
          "NumberLoadedRows": 1000000,
          "NumberFilteredRows": 1,
          "NumberUnselectedRows": 0,
          "LoadBytes": 40888898,
          "LoadTimeMs": 2144,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 2,
          "ReadDataTimeMs": 325,
          "WriteDataTimeMs": 1933,
          "CommitAndPublishTimeMs": 106,
          "ErrorURL": "http://fe_host:http_port/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
      }
      • Stream load导入结果参数如下表。
        表3 参数说明

        参数

        说明

        TxnId

        导入的事务ID。用户可不感知。

        Label

        导入Label。由用户指定或系统自动生成。

        Status

        导入完成状态。

        • Success:表示导入成功。
        • Publish Timeout:该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
        • Label Already Exists:Label重复,需更换Label。
        • Fail:导入失败。

        ExistingJobStatus

        已存在的Label对应的导入作业的状态。

        这个字段只有在当Status为“Label Already Exists”时才会显示。用户可以通过这个状态,知晓已存在Label对应的导入作业的状态。“RUNNING”表示作业还在执行,“FINISHED”表示作业成功。

        Message

        导入错误信息。

        NumberTotalRows

        导入总处理的行数。

        NumberLoadedRows

        成功导入的行数。

        NumberFilteredRows

        数据质量不合格的行数。

        NumberUnselectedRows

        被where条件过滤的行数。

        LoadBytes

        导入的字节数。

        LoadTimeMs

        导入完成时间。单位毫秒。

        BeginTxnTimeMs

        向Fe请求开始一个事务所花费的时间,单位毫秒。

        StreamLoadPutTimeMs

        向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。

        ReadDataTimeMs

        读取数据所花费的时间,单位毫秒。

        WriteDataTimeMs

        执行写入数据操作所花费的时间,单位毫秒。

        CommitAndPublishTimeMs

        向Fe请求提交并且发布事务所花费的时间,单位毫秒。

        ErrorURL

        如果有数据质量问题,通过访问这个URL查看具体错误行。

        由于Stream load是同步的导入方式,所以并不会在Doris系统中记录导入信息,用户无法异步的通过查看导入命令看到Stream load。使用时需监听创建导入请求的返回值获取导入结果。

  • 示例2,json数据格式导入。

    准备json格式数据并保存为testjson.json,并将json数据上传至doris客户端:

    {"id": 100, "city": "B", "code" : 1}

    • 创建Doris表。
      CREATE TABLE `doris_testjson01` (
        `id` varchar(32) NOT NULL,
        `city` ARRAY<int(11)>,
        `code` int(11)
      ) ENGINE=OLAP
      DUPLICATE KEY(`id`)
      COMMENT "OLAP"
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
      'replication_allocation' = 'tag.location.default: 3',
      'in_memory' = 'false',
      'storage_format' = 'V2'
      );
    • curl命令进行load数据。
      curl --location-trusted -u admin:{doris集群密码} -H 'format: json' -T testjson.json  https://fe_host:http_port/api/{doris数据库}/doris_testjson01/_stream_load -k
    • 查询数据。
      select * from doris_testjson01;

取消导入

用户无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。

查看Stream load

用户可以通过show stream load来查看已经完成的stream load任务。

相关文档