使用Stream Load导入数据至StarRocks集群
StarRocks支持使用Stream Load方式从本地直接导入CSV格式的数据,数据量需小于10GB,该导入方式是通过用户发送HTTP请求将本地文件或数据流导入到StarRocks中,Stream Load同步执行导入并返回结果,用户通过返回结果判断是否导入成功。
Stream Load主要适用于导入本地文件,或通过程序导入数据流中的数据。支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。
使用限制
- 导入CSV格式的数据时,需要确保每行数据结尾都有行分隔符。
- Stream Load任务在执行后无法手动取消,任务超时或者导入错误后会被系统自动取消。
前提条件
- 创建弹性云服务器,请参考购买ECS章节。
- 创建StarRocks集群,请参考创建StarRocks存算一体集群。
- 已安装MySQL客户端,请参考手动安装MySQL客户端
- 将本地主机IP地址加入ECS安全组中,保证本地主机可以访问ECS。
从本地导入数据至StarRocks集群中
以下示例为使用Stream Load将test.csv文件中的数据导入到表users中。
- 使用SSH登录工具,通过弹性IP远程登录到Linux弹性云服务器。具体登录操作步骤请参见弹性云服务器《用户指南》中的“ SSH密码方式登录”。
- 在安装MySQL客户端的路径下创建test.csv文件,文件内容为:
1001,'test1',123456@.qqcom,'测试地址1',18,1 1002,'test2',123456@.qqcom,'测试地址2',18,1 1003,'test3',123456@.qqcom,'测试地址3',20,0 1004,'test4',123456@.qqcom,'测试地址4',21,1 1005,'test5',123456@.qqcom,'测试地址5',23,0 1006,'test6',123456@.qqcom,'测试地址6',22,1 1007,'test7',123456@.qqcom,'测试地址7',18,0 1008,'test8',123456@.qqcom,'测试地址8',25,1 1009,'test9',123456@.qqcom,'测试地址9',19,0 1010,'test10',123456@.qqcom,'测试地址10',10,1 1011,'test11',123456@.qqcom,'测试地址11',18,1
- 连接StarRocks集群。
./mysql -uadmin -ppassword -h集群内网地址 -P9030
- 执行以下命令创建表。
- 创建数据库cloudtable_demo。
create database cloudtable_demo;
- 使用cloudtable_demo数据库。
use cloudtable_demo;
- 创建表users。
CREATE TABLE users ( user_id BIGINT NOT NULL, NAME STRING NOT NULL, email STRING NULL, address STRING NULL, age TINYINT NULL, sex TINYINT NULL ) PRIMARY KEY (user_id) DISTRIBUTED BY HASH(user_id) BUCKETS 4;
- 创建数据库cloudtable_demo。
- 执行以下命令,把test.csv文件中的数据导入到users表中:
- 非事务级别导入数据。
curl --location-trusted -u admin:<password> -T test.csv -H "column_separator:," -XPUT http://StarRocks FE实例IP地址:HTTP端口号/api/数据库名/表名/_stream_load
- 事务级别导入数据。
- 执行以下命令开启事务。
curl --location-trusted -u admin:<password> -H "label:<labelName>" \ -H "Expect:100-continue" \ -H "db:数据库名" -H "table:表名" \ -XPOST http://StarRocks FE实例IP地址:HTTP端口号/api/transaction/begin
- 执行导入数据操作。
curl --location-trusted -u admin:<password> -H "label:<labelName>" \ -H "Expect:100-continue" \ -H "db:数据库名" -H "table:表名" \ -T <filePath> \ -H "column_separator: ," \ -XPUT http://StarRocks FE实例IP地址:HTTP端口号/api/transaction/load
- 提交事务。
curl --location-trusted -u admin:<password> -H "label:<labelName>" \ -H "Expect:100-continue" \ -H "db:数据库名" \ -XPOST http://StarRocks FE实例IP地址:HTTP端口号/api/transaction/commit
表1 事务级别导入数据参数描述 参数
描述
HTTP端口
默认是8030。
StarRocks FE节点IP地址
单击“集群名称 > 详情”,进入详情页获取FE节点IP地址。
password
为创建集群时的用户密码。
labelName
标签名称,取值唯一。
-T
数据所在文件名。
-H
表示头部信息。column_separator为导入数据文件中字段间隔符,默认为“\t”,也可设置为“,”,例如:curl --location-trusted -u root -T test.csv -H "column_separator:," -XPUT http://192.168.2.151:8030/api/mrs_demo/users/_stream_load
- 执行以下命令开启事务。
- 非事务级别导入数据。
- 查看数据是否导入成功。
- 方法一:Stream Load是同步导入操作,命令执行完成后即可查看数据是否导入成功,导入结果如下。
表2 Stream Load导入任务结果参数说明 参数
参数描述
TxnId
导入的事务ID。
Label
导入Label,由用户指定或系统自动生成。
Status
导入完成状态。
- Success:表示导入成功。
- Publish Timeout:该状态也表示导入已完成,只是数据可能会延迟可见,无需重试。
- Label Already Exists:Label重复,需更换Label。
- Fail:导入失败。
ExistingJobStatus
已存在的Label对应的导入作业的状态。该字段只有当Status为“Label Already Exists”时才显示。可以通过这个状态,查看已存在Label对应的导入作业的状态。
- RUNNING:表示作业正在执行中。
- FINISHED:表示作业执行成功。
Message
导入错误信息。
NumberTotalRows
导入总处理的行数。
NumberLoadedRows
成功导入的行数。
NumberFilteredRows
数据不合格的行数。
NumberUnselectedRows
被where条件过滤的行数。
LoadBytes
导入的字节数。
LoadTimeMs
导入完成时间,单位为:毫秒。
BeginTxnTimeMs
向FE请求开始一个事务所花费的时间,单位为:毫秒。
StreamLoadPutTimeMs
向FE请求获取导入数据执行计划所花费的时间,单位为:毫秒。
ReadDataTimeMs
读取数据所花费的时间,单位为:毫秒。
WriteDataTimeMs
执行写入数据操作所花费的时间,单位为:毫秒。
CommitAndPublishTimeMs
向FE请求提交并且发布事务所花费的时间,单位为:毫秒。
ErrorURL
如果有数据问题,通过访问该URL可查看具体错误行。
- 方法二:执行查询表操作,判断数据是否导入成功,查看的表数据与1文件中的数据一致则表示导入成功。
select * from users;

- 方法一:Stream Load是同步导入操作,命令执行完成后即可查看数据是否导入成功,导入结果如下。
