使用Stream Load方式导入数据至Doris集群
Stream load是一个同步的导入方式,用户通过发送HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream load同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
Stream load主要适用于导入本地文件,或通过程序导入数据流中的数据。
基本原理
下图展示了Stream load的主要流程,省略了一些导入细节。
^ + | | | | 1A. User submit load to FE | | | +--v-----------+ | | FE | 5. Return result to user | +--+-----------+ | | | | 2. Redirect to BE | | | +--v-----------+ +---+Coordinator BE| 1B. User submit load to BE +-+-----+----+-+ | | | +-----+ | +-----+ | | | 3. Distribute data | | | +-v-+ +-v-+ +-v-+ |BE | |BE | |BE | +---+ +---+ +---+
Stream load中,Doris会选定一个节点作为Coordinator节点。该节点负责接数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。用户也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给用户。
基本操作

在执行数据导入Stream Load操作之前,必须确保Doris集群的安全组端口开放,即8030和8040端口,否则Stream Load操作将会连接超时。
- 创建导入
Stream Load通过HTTP协议提交和传输数据。这里通过curl命令展示如何提交导入。
用户也可以通过其他HTTP client进行操作。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

- Header中支持属性见下面的‘导入任务参数’说明。
- 格式为:-H "key1:value1"。
- port:HTTP的端口。
创建导入任务的详细语法可以通过HELP STREAM LOAD命令查看。Stream Load中所有与导入任务相关的参数均设置在Header中。相关参数描述如下表所示。
参数 |
说明 |
|
---|---|---|
签名参数 |
user/passwd |
Stream load由于创建导入的协议使用的是HTTP协议,通过Basic access authentication进行签名。Doris系统会根据签名验证用户身份和导入权限。 |
导入任务参数 |
label |
导入任务的标识。每个导入任务,都有一个在单database内部唯一的label。label是用户在导入命令中自定义的名称。通过这个label,用户可以查看对应导入任务的执行情况。 label的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。当label对应的导入作业状态为CANCELLED时,该label可以再次被使用。 |
column_separator |
用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。 如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。 可以使用多个字符的组合作为列分隔符。 |
|
line_delimiter |
用于指定导入文件中的换行符,默认为\n。 可以使用做多个字符的组合作为换行符。 |
|
max_filter_ratio |
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。 如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。 计算公式为: (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio dpp.abnorm.ALL表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。 dpp.norm.ALL指的是导入过程中正确数据的条数。可以通过SHOW LOAD命令查询导入任务的正确数据量。 原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL。 |
|
where |
导入任务指定的过滤条件。Stream load支持对原始数据指定where语句进行过滤。被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。 |
|
Partitions |
待导入表的Partition信息,如果待导入数据不属于指定的Partition则不会被导入。这些数据将计入dpp.abnorm.ALL。 |
|
columns |
待导入数据的函数变换配置,目前Stream load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 |
|
exec_mem_limit |
导入内存限制。默认为2GB,单位为字节。 |
|
strict_mode |
Stream Load导入可以开启strict mode模式。开启方式为在HEADER中声明strict_mode=true 。默认的strict mode为关闭。 strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: 对于列类型转换来说,如果strict mode为true,则错误的数据将被filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。对于导入的某列由函数变换生成时,strict mode对其不产生影响。对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode对其也不产生影响。例如:如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据strict对其不产生影响。 |
|
merge_type |
数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,Delete表示删除与这批数据Key相同的所有行,MERGE语义需要与Delete条件联合使用,表示满足Delete条件的数据按照Delete语义处理其余的按照APPEND语义处理。 |
|
two_phase_commit |
Stream load导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。 示例:
|
- 示例1,CSV数据格式导入。
- 创建Doris表
CREATE TABLE cloudtable0327.doris_streameload_test01 ( user_id bigint, date date, group_id bigint, modify_date date, keyword VARCHAR(128) ) UNIQUE KEY(user_id, date, group_id) DISTRIBUTED BY HASH (user_id) BUCKETS 32 PROPERTIES( 'function_column.sequence_col' = 'modify_date', 'replication_num' = '3', 'in_memory' = 'false' );
- 准备数据表sequencedata01.csv。
表2 sequencedata01.csv 1
2020-02-22
1
2020-02-21
a
1
2020-02-22
1
2020-02-22
b
1
2020-02-22
1
2020-03-05
c
1
2020-02-22
1
2020-02-26
d
1
2020-02-22
1
2020-02-23
e
1
2020-02-22
1
2020-02-24
b
- 执行curl命令load数据。
curl -k --location-trusted -u admin:passwd -T sequencedata01.csv -H 'column_separator:,' https://{fe_host}:{http_port}/api/cloudtable0327/doris_streameload_test01/_stream_load
- 返回结果。
由于Stream load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://fe_host:http_port/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
- Stream load导入结果参数如下表。
表3 参数说明 参数
说明
TxnId
导入的事务ID。用户可不感知。
Label
导入Label。由用户指定或系统自动生成。
Status
导入完成状态。
- Success:表示导入成功。
- Publish Timeout:该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
- Label Already Exists:Label重复,需更换Label。
- Fail:导入失败。
ExistingJobStatus
已存在的Label对应的导入作业的状态。
这个字段只有在当Status为“Label Already Exists”时才会显示。用户可以通过这个状态,知晓已存在Label对应的导入作业的状态。“RUNNING”表示作业还在执行,“FINISHED”表示作业成功。
Message
导入错误信息。
NumberTotalRows
导入总处理的行数。
NumberLoadedRows
成功导入的行数。
NumberFilteredRows
数据质量不合格的行数。
NumberUnselectedRows
被where条件过滤的行数。
LoadBytes
导入的字节数。
LoadTimeMs
导入完成时间。单位毫秒。
BeginTxnTimeMs
向Fe请求开始一个事务所花费的时间,单位毫秒。
StreamLoadPutTimeMs
向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。
ReadDataTimeMs
读取数据所花费的时间,单位毫秒。
WriteDataTimeMs
执行写入数据操作所花费的时间,单位毫秒。
CommitAndPublishTimeMs
向Fe请求提交并且发布事务所花费的时间,单位毫秒。
ErrorURL
如果有数据质量问题,通过访问这个URL查看具体错误行。
由于Stream load是同步的导入方式,所以并不会在Doris系统中记录导入信息,用户无法异步的通过查看导入命令看到Stream load。使用时需监听创建导入请求的返回值获取导入结果。
- Stream load导入结果参数如下表。
- 创建Doris表
- 示例2,json数据格式导入。
准备json格式数据并保存为testjson.json,并将json数据上传至doris客户端:
{"id": 100, "city": "B", "code" : 1}
- 创建Doris表。
CREATE TABLE `doris_testjson01` ( `id` varchar(32) NOT NULL, `city` ARRAY<int(11)>, `code` int(11) ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( 'replication_allocation' = 'tag.location.default: 3', 'in_memory' = 'false', 'storage_format' = 'V2' );
- curl命令进行load数据。
curl --location-trusted -u admin:{doris集群密码} -H 'format: json' -T testjson.json https://fe_host:http_port/api/{doris数据库}/doris_testjson01/_stream_load -k
- 查询数据。
select * from doris_testjson01;
- 创建Doris表。
取消导入
用户无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。
查看Stream load
用户可以通过show stream load来查看已经完成的stream load任务。