更新时间:2024-11-29 GMT+08:00

Stream Load

Stream Load是一个同步的导入方式,用户通过HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream Load同步执行导入并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。

Stream Load主要适用于导入本地文件,或通过程序导入数据流中的数据。支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。

语法介绍

  • 创建Stream Load导入​任务

    Stream Load通过HTTP协议提交和传输数据。该操作通过curl命令演示如何提交导入,也可以使用其他HTTP Client进行操作。

    • 集群已启用Kerberos认证(安全模式)

      curl -k --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT https://Doris FE实例IP地址:HTTPS端口/api/{数据库名称}/{表名}/_stream_load

    • 集群未启用Kerberos认证(普通模式)

      curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://Doris FE实例IP地址:HTTP端口/api/{数据库名称}/{表名}/_stream_load

    Doris FE实例IP地址可在Manager界面,选择“集群 > 服务 > Doris > 实例”查看。

    HTTPS端口号可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看。

    HTTP端口号可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“http_port”查看。

    表1介绍了创建Stream Load任务的其他部分参数。

    表1 Stream Load任务参数介绍

    参数

    描述

    签名参数

    user:passwd

    Stream Load创建导入的协议使用的是HTTP协议,通过Basic access authentication进行签名。Doris系统会根据签名验证用户身份和导入权限。

    导入任务参数(格式为:-H "key1:value1"

    label

    导入任务的标识。每个导入任务,都有一个在单database内部唯一的label。label是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。

    column_separator

    用于指定导入文件中的列分隔符,默认为\t,可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"

    line_delimiter

    用于指定导入文件中的换行符,默认为\n,可以使用做多个字符的组合作为换行符。

    max_filter_ratio

    导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。

    where

    导入任务指定的过滤条件。Stream Load支持对原始数据指定where语句进行过滤,被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected

    Partitions

    待导入表的Partition信息,如果待导入数据不属于指定的Partition,则不会被导入,这些数据将计入dpp.abnorm.ALL。

    columns

    待导入数据的函数变换配置,目前 Stream Load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句一致。

    format

    指定导入数据格式,支持CSV、JSON、Parquet、ORC等,默认是CSV。

    exec_mem_limit

    导入内存限制,默认为:2GB,单位为:字节。

    strict_mode

    Stream Load导入可以开启strict mode模式,在HEADER中声明strict_mode=true即可开启,默认关闭strict mode。

    strict mode用于对导入过程中的列类型转换进行严格过滤,策略如下:

    • 对于列类型转换来说,如果strict mode为“true”,则错误的数据将被filter。错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的数据。
    • 对于导入的某列由函数变换生成时,strict mode对其不产生影响。
    • 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制,strict mode对其也不产生影响。例如:如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,这种数据 strict对其不产生影响。

    merge_type

    数据的合并类型,支持APPEND、DELETE和MERGE三种类型,默认为APPEND。APPEND表示这批数据需要全部追加到现有数据中;DELETE表示删除与这批数据Key相同的所有行;MERGE语义需要与delete条件联合使用,满足delete条件的数据按照DELETE语义处理,其余的按照APPEND语义处理。

    two_phase_commit

    Stream Load导入可以开启两阶段事务提交模式。在Stream Load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为“PRECOMMITTED”,用户手动触发commit操作之后,数据才可见。

    enable_profile

    当“enable_profile”为“true”时,Stream Load profile将会打印到日志中,否则不会打印。

  • 返回结果

    由于Stream Load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回,例如:

    {
    "TxnId": 1003,
    "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
    "Status": "Success",
    "ExistingJobStatus": "FINISHED", // optional
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
    }

    导入结果参数介绍如表2所示。

    表2 Stream Load导入任务结果参数介绍

    参数

    描述

    TxnId

    导入的事务ID。用

    Label

    导入Label,由用户指定或系统自动生成。

    Status

    导入完成状态,包括:

    • Success:表示导入成功。
    • Publish Timeout:该状态也表示导入已完成,只是数据可能会延迟可见,无需重试。
    • Label Already Exists:Label重复,需更换Label。
    • Fail:导入失败。

    ExistingJobStatus

    已存在的Label对应的导入作业的状态。

    该字段只有当Status为"Label Already Exists"时才显示。可以通过这个状态,查看已存在Label对应的导入作业的状态。"RUNNING" 表示作业还在执行,"FINISHED"表示作业执行成功。

    Message

    导入错误信息。

    NumberTotalRows

    导入总处理的行数。

    NumberLoadedRows

    成功导入的行数。

    NumberFilteredRows

    数据不合格的行数。

    NumberUnselectedRows

    被where条件过滤的行数。

    LoadBytes

    导入的字节数

    LoadTimeMs

    导入完成时间,单位为:毫秒。

    BeginTxnTimeMs

    向FE请求开始一个事务所花费的时间,单位为:毫秒。

    StreamLoadPutTimeMs

    向FE请求获取导入数据执行计划所花费的时间,单位为:毫秒。

    ReadDataTimeMs

    读取数据所花费的时间,单位为:毫秒。

    WriteDataTimeMs

    执行写入数据操作所花费的时间,单位为:毫秒。

    CommitAndPublishTimeMs

    向FE请求提交并且发布事务所花费的时间,单位为:毫秒。

    ErrorURL

    如果有数据问题,通过访问该URL查看具体错误行。

    由于Stream Load是同步的导入方式,所以不会在Doris系统中记录导入信息,用户无法异步通过查看导入命令看到Stream Load。使用时需查看创建导入请求的返回值获取导入结果。

  • 取消导入​

    用户无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。

  • 查看Stream Load​

    用户可以通过show stream load查看已经完成的Stream Load任务。

    默认BE不记录Stream Load的导入信息,如果需要查看需在配置enable_stream_load_record=true参数启用记录。

前提条件

  • 已创建包含Doris服务的集群,集群内各服务运行正常。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考安装MySQL客户端

Stream Load任务示例

  1. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

    集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -u数据库登录用户 -p数据库登录用户密码 -PFE查询连接端口 -hDoris FE实例IP地址

    • Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
    • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
    • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。

  2. 执行以下命令创建数据库:

    create database if not exists example_db;

  3. 执行以下命令创建表:

    CREATE TABLE example_db.test_stream_tbl (

    `c1` int NOT NULL,

    `c2` int NOT NULL,

    `c3` string NOT NULL,

    `c4` date NOT NULL

    ) ENGINE=OLAP

    UNIQUE KEY(`c1`, `c2`)

    DISTRIBUTED BY HASH(`c1`) BUCKETS 1;

  4. 创建数据文件“data.csv”,内容为:

    1,1,1,2020-02-21
    2,2,2,2020-03-21
    3,3,3,2020-04-21

  5. 使用Stream Load导入“data.csv”中的数据到3创建的表中:

    • 集群已启用Kerberos认证(安全模式)

      curl -k --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv https://Doris FE实例IP地址:HTTPS端口/api/example_db/test_stream_tbl/_stream_load

    • 集群未启用Kerberos认证(普通模式)

      curl --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv http://Doris FE实例IP地址:HTTP端口/api/example_db/test_stream_tbl/_stream_load

  6. 执行以下命令查看表数据:

    select * from example_db.test_stream_tbl;

相关参数配置

登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > 全部配置”,新增如下参数:

  • 选择“FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增参数:

    stream_load_default_timeout_second:表示导入任务的超时时间(单位为秒),若导入任务在设定的时间内未完成则会被系统取消,状态变为“CANCELLED”。默认超时时间为600秒,如果导入的源文件无法在规定时间内完成导入,可以在Stream Load请求中设置单独的超时时间,或调整“stream_load_default_timeout_second”参数值设置全局的默认超时时间。

  • 选择“BE(角色) > 自定义”,在自定义参数“be.conf.customized.configs”中新增参数:

    streaming_load_max_mb:表示Stream Load的最大导入文件大小,默认值为10G,单位为MB。如果原始文件超过该值,则需要适当调整该参数值。