使用Broker Load导入数据至StarRocks集群
StarRocks支持从Apache HDFS、OBS等外部存储系统导入数据,支持CSV、ORCFile、Parquet等文件格式,默认支持导入CSV格式数据,数据量在几十GB到上百GB级别。
在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(如HDFS、OBS)上的数据,利用自身的计算资源对数据进行预处理和导入。Broker Load是一种异步的导入方式,需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。
前提条件
- 创建弹性云服务器,请参考购买ECS章节。
- 创建StarRocks集群,请参考创建StarRocks存算一体集群。
- 已安装MySQL客户端,请参考手动安装MySQL客户端
- 将本地主机IP地址加入ECS安全组中,保证本地主机可以访问ECS。
从OBS导入数据至StarRocks集群
- 在本地文件系统下,创建两个CSV格式的数据文件,file1.csv和file2.csv。两个数据文件都包含三列,分别代表用户ID、用户姓名和用户得分,如下所示。
- file1.csv
1,Lily,21 2,Rose,22 3,Alice,23 4,Julia,24
- file2.csv
5,Tony,25 6,Adam,26 7,Allen,27 8,Jacky,28
- file1.csv
- 将file1.csv和file2.csv上传到云存储空间的指定路径下,假设上传到OBS存储空间bucket_obs里的input文件夹下。
- 连接StarRocks集群。
使用SSH登录工具,通过弹性IP远程登录到Linux弹性云服务器。具体登录操作步骤请参见弹性云服务器《用户指南》中的“ SSH密码方式登录”。
./mysql -uadmin -ppassword -h集群内网地址 -P9030
- 创建两张主键表,table1和table2。两张表都包含id、name和score三列,分别代表用户ID、用户姓名和用户得分,主键为id列,如下所示:
- 创建数据库test_db,使用数据库test_db。
CREATE DATABASE test_db;
USE test_db;
- 创建表table1。
CREATE TABLE `table1` ( `id` int(11) NOT NULL COMMENT "用户 ID", `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名", `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`); - 创建table2。
CREATE TABLE `table2` ( `id` int(11) NOT NULL COMMENT "用户 ID", `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名", `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);
- 创建数据库test_db,使用数据库test_db。
- 从OBS导入数据至StarRocks集群。
- 导入单个数据文件到单表。
LOAD LABEL test_db.label_brokerloadtest_601 ( DATA INFILE("obs://bucket_obs/input/file1.csv") INTO TABLE table1 COLUMNS TERMINATED BY "," (id, name, score) ) WITH BROKER ( "fs.obs.access.key" = "<obs_access_key>", "fs.obs.secret.key" = "<obs_secret_key>", "fs.obs.endpoint" = "<obs_endpoint>" ) PROPERTIES ( "timeout" = "3600" );fs.obs.access.key、fs.obs.secret.key、fs.obs.endpoint获取方式如下所示:
- fs.obs.access.key与fs.obs.secret.key的获取方式请参见对象存储服务的获取访问密钥(AK/SK)章节。
- fs.obs.endpoint的获取方式见请参见对象存储服务的获取终端节点章节。
- 导入多个数据文件到单表。
LOAD LABEL test_db.label_brokerloadtest_602 ( DATA INFILE("obs://bucket_obs/input/*") INTO TABLE table1 COLUMNS TERMINATED BY "," (id, name, score) ) WITH BROKER ( "fs.obs.access.key" = "<obs_access_key>", "fs.obs.secret.key" = "<obs_secret_key>", "fs.obs.endpoint" = "<obs_endpoint>" ) PROPERTIES ( "timeout" = "3600" ); - 导入多个数据文件到多表。
LOAD LABEL test_db.label_brokerloadtest_603 ( DATA INFILE("obs://bucket_obs/input/file1.csv") INTO TABLE table1 COLUMNS TERMINATED BY "," (id, name, score) , DATA INFILE("obs://bucket_obs/input/file2.csv") INTO TABLE table2 COLUMNS TERMINATED BY "," (id, name, score) ) WITH BROKER ( "fs.obs.access.key" = "<obs_access_key>", "fs.obs.secret.key" = "<obs_secret_key>", "fs.obs.endpoint" = "<obs_endpoint>" ); PROPERTIES ( "timeout" = "3600" );
- 导入单个数据文件到单表。
- 查看导入作业。
- 查看test_db数据库中导入作业的执行情况,同时指定查询结果根据作业创建时间(CREATE_TIME)按降序排列,并且最多显示两条结果数据。
SELECT * FROM information_schema.loads WHERE database_name = 'test_db' ORDER BY create_time DESC LIMIT 2\G
- 查看test_db数据库中标签为label_brokerload_unqualifiedtest_82的导入作业的执行情况。
SELECT * FROM information_schema.loads WHERE database_name = 'test_db' and label = 'label_brokerload_unqualifiedtest_82'\G
- 查看test_db数据库中导入作业的执行情况,同时指定查询结果根据作业创建时间(CREATE_TIME)按降序排列,并且最多显示两条结果数据。
从HDFS导入数据StarRocks集群
- 连接StarRocks集群。
使用SSH登录工具,通过弹性IP远程登录到Linux弹性云服务器。具体登录操作步骤请参见弹性云服务器《用户指南》中的“ SSH密码方式登录”。
./mysql -uadmin -ppassword -h集群内网地址 -P9030
- 执行以下命令创建表。
- 创建数据库test_db,使用数据库test_db。
CREATE DATABASE test_db;
USE test_db;
- 创建用来存储text格式数据的sr_hdfs_txt表。
CREATE TABLE sr_hdfs_txt ( `c1` int NOT NULL, `c4` date NULL, `c2` int NOT NULL, `c3` String NOT NULL ) ENGINE=OLAP UNIQUE KEY(`c1`, `c4`) PARTITION BY RANGE(`c4`) ( PARTITION P202204 VALUES [('2022-04-01'), ('2022-05-01'))) DISTRIBUTED BY HASH(`c1`) BUCKETS 1 PROPERTIES ( "dynamic_partition.time_unit" = "MONTH", "dynamic_partition.end" = "2" ); - 创建用来存储ORC格式数据的表。
CREATE TABLE sr_hdfs_orc ( `c1` int NOT NULL, `c4` date NULL, `c2` int NOT NULL, `c3` String NOT NULL ) ENGINE=OLAP UNIQUE KEY(`c1`, `c4`) PARTITION BY RANGE(`c4`) ( PARTITION P202204 VALUES [('2022-04-01'), ('2022-05-01'))) DISTRIBUTED BY HASH(`c1`) BUCKETS 1 PROPERTIES ( "dynamic_partition.time_unit" = "MONTH", "dynamic_partition.end" = "2" ); - 参数描述。
- dynamic_partition.time_unit(必选):动态分区的时间粒度,时间粒度会决定动态创建的分区名后缀格式。
- HOUR:仅支持Datetime类型,动态创建的分区名后缀格式为:yyyyMMddHH,例如:2023032101。
- DAY时:动态创建的分区名后缀格式为:yyyyMMdd,例如:20230321。
- WEEK:动态创建的分区名后缀格式为:yyyy_ww,例如:2023_13 ,表示2023年第13周。
- MONTH:动态创建的分区名后缀格式为:yyyyMM,例如:202303。
- YEAR:动态创建的分区名后缀格式为:yyyy,例如:2023。
- dynamic_partition.end(必选):提前创建的分区数量,取值范围为正整数。根据“dynamic_partition.time_unit”属性的不同,以当天/周/月为基准,提前创建对应范围的分区。
- dynamic_partition.time_unit(必选):动态分区的时间粒度,时间粒度会决定动态创建的分区名后缀格式。
- 创建数据库test_db,使用数据库test_db。
- 执行以下命令导入数据。
- 导入text格式数据。
LOAD LABEL broker_load_2024_02_23 ( DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/user/hive/warehouse/test_table/*/*") INTO TABLE sr_hdfs_txt COLUMNS TERMINATED BY "," (c1,c2,c3) COLUMNS FROM PATH AS (`c4`) SET ( c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3 ) ) WITH BROKER "broker_192_168_2_151" ( "username"="hdfs", "password"="" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
- 导入ORC格式数据。
LOAD LABEL broker_load_2024_03_2 ( DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/user/hive/warehouse/test_orc_tbl/*/*") INTO TABLE sr_hdfs_orc COLUMNS TERMINATED BY "," FORMAT AS "orc" (c1,c2,c3) COLUMNS FROM PATH AS (`c4`) SET ( c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3 ) ) WITH BROKER "broker_192_168_2_151" ( "username"="hdfs", "password"="" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );表1 数据导入参数描述 参数
描述
LABEL
导入任务的标识。每个导入任务,都有一个数据库内部唯一的Label。Label是用户在导入命令中自定义的名称。
hdfs_ip
HDFS实例IP地址。获取IP地址方法如下:
MRS中HDFS组件:进入Manager界面,单击“集群 > 服务 > HDFS > 实例”查看。
hdfs_port
HDFS实例RPC端口号。
MRS中HDFS组件:进入Manager界面,单击“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
FORMAT AS
用于指定导入文件的类型,例如:parquet、orc、csv,默认为csv。
COLUMNS FROM PATH AS
提取文件路径中的分区字段。
INTO TABLE
需要导入数据的StartRocks表。
broker_192_168_2_151
表示Broker名称,可在MySQL客户端执行show broker;命令查看。
- 导入text格式数据。
- 执行以下命令查看导入任务的状态信息。
show load order by createtime desc limit 1\G;

表2 导入任务返回结果参数描述 参数
描述
JobId
导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。
Label
导入任务的标识。
State
- 导入任务当前所处的阶段。
- PENDING:表示当前导入任务正在等待被执行。
- LOADING:表示导入任务正在执行中。
- CANCELLED:表示任务导入失败。
- FINISHED:表示任务导入成功。
Progress
导入任务的进度描述。包括ETL和LOAD,分别对应导入流程的两个阶段ETL和LOADING。
TaskInfo
主要显示了当前导入任务参数,即创建Broker Load导入任务时用户指定的参数,包括:cluster、timeout和max-filter-ratio。
ErrorMsg
如果导入任务状态为“CANCELLED”,会显示失败的原因,包括type和msg两部分。如果导入任务成功则显示“N/A”。
JobDetails
显示作业的详细运行状态,包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点ID,未完成的BE节点ID。
- 导入任务当前所处的阶段。
- 执行以下命令查看表内是否有数据:
select * from sr_hdfs_txt;

- 手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的Label ,操作命令如下。
CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";
例如:取消数据库demo中Label为broker_load_2022_03_23的导入作业。
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";