更新时间:2026-01-06 GMT+08:00
分享

使用Broker Load导入数据至StarRocks集群

StarRocks支持从Apache HDFS、OBS等外部存储系统导入数据,支持CSV、ORCFile、Parquet等文件格式,默认支持导入CSV格式数据,数据量在几十GB到上百GB级别。

在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(如HDFS、OBS)上的数据,利用自身的计算资源对数据进行预处理和导入。Broker Load是一种异步的导入方式,需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。

前提条件

从OBS导入数据至StarRocks集群

  1. 在本地文件系统下,创建两个CSV格式的数据文件,file1.csv和file2.csv。两个数据文件都包含三列,分别代表用户ID、用户姓名和用户得分,如下所示。

    • file1.csv
      1,Lily,21
      2,Rose,22
      3,Alice,23
      4,Julia,24
    • file2.csv
      5,Tony,25
      6,Adam,26
      7,Allen,27
      8,Jacky,28

  2. 将file1.csv和file2.csv上传到云存储空间的指定路径下,假设上传到OBS存储空间bucket_obs里的input文件夹下。
  3. 连接StarRocks集群。

    使用SSH登录工具,通过弹性IP远程登录到Linux弹性云服务器。具体登录操作步骤请参见弹性云服务器《用户指南》中的“ SSH密码方式登录”。

    ./mysql -uadmin -ppassword -h集群内网地址 -P9030

  4. 创建两张主键表,table1和table2。两张表都包含id、name和score三列,分别代表用户ID、用户姓名和用户得分,主键为id列,如下所示:

    • 创建数据库test_db,使用数据库test_db。
      CREATE DATABASE test_db;
      USE test_db;
    • 创建表table1。
      CREATE TABLE `table1`
         (
             `id` int(11) NOT NULL COMMENT "用户 ID",
             `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
             `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
         )
             ENGINE=OLAP
             PRIMARY KEY(`id`)
             DISTRIBUTED BY HASH(`id`);
    • 创建table2。
      CREATE TABLE `table2`
         (
             `id` int(11) NOT NULL COMMENT "用户 ID",
             `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
             `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
         )
             ENGINE=OLAP
             PRIMARY KEY(`id`)
             DISTRIBUTED BY HASH(`id`);

  5. 从OBS导入数据至StarRocks集群。

    • 导入单个数据文件到单表。
      LOAD LABEL test_db.label_brokerloadtest_601
      (
          DATA INFILE("obs://bucket_obs/input/file1.csv")
          INTO TABLE table1
          COLUMNS TERMINATED BY ","
          (id, name, score)
      )
      WITH BROKER
      (
          "fs.obs.access.key" = "<obs_access_key>",
          "fs.obs.secret.key" = "<obs_secret_key>",
          "fs.obs.endpoint" = "<obs_endpoint>"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );

      fs.obs.access.key、fs.obs.secret.key、fs.obs.endpoint获取方式如下所示:

    • 导入多个数据文件到单表。
      LOAD LABEL test_db.label_brokerloadtest_602
      (
          DATA INFILE("obs://bucket_obs/input/*")
          INTO TABLE table1
          COLUMNS TERMINATED BY ","
          (id, name, score)
      )
      WITH BROKER
      (
          "fs.obs.access.key" = "<obs_access_key>",
          "fs.obs.secret.key" = "<obs_secret_key>",
          "fs.obs.endpoint" = "<obs_endpoint>"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
    • 导入多个数据文件到多表。
      LOAD LABEL test_db.label_brokerloadtest_603
      (
          DATA INFILE("obs://bucket_obs/input/file1.csv")
          INTO TABLE table1
          COLUMNS TERMINATED BY ","
          (id, name, score)
          ,
          DATA INFILE("obs://bucket_obs/input/file2.csv")
          INTO TABLE table2
          COLUMNS TERMINATED BY ","
          (id, name, score)
      )
      WITH BROKER
      (
          "fs.obs.access.key" = "<obs_access_key>",
          "fs.obs.secret.key" = "<obs_secret_key>",
          "fs.obs.endpoint" = "<obs_endpoint>"
      );
      PROPERTIES
      (
          "timeout" = "3600"
      );

  6. 查看导入作业。

    • 查看test_db数据库中导入作业的执行情况,同时指定查询结果根据作业创建时间(CREATE_TIME)按降序排列,并且最多显示两条结果数据。
      SELECT * FROM information_schema.loads
      WHERE database_name = 'test_db'
      ORDER BY create_time DESC
      LIMIT 2\G
    • 查看test_db数据库中标签为label_brokerload_unqualifiedtest_82的导入作业的执行情况。
      SELECT * FROM information_schema.loads
      WHERE database_name = 'test_db' and label = 'label_brokerload_unqualifiedtest_82'\G

从HDFS导入数据StarRocks集群

  1. 连接StarRocks集群。

    使用SSH登录工具,通过弹性IP远程登录到Linux弹性云服务器。具体登录操作步骤请参见弹性云服务器《用户指南》中的“ SSH密码方式登录”。

    ./mysql -uadmin -ppassword -h集群内网地址 -P9030

  2. 执行以下命令创建表。

    • 创建数据库test_db,使用数据库test_db。
      CREATE DATABASE test_db;
      USE test_db;
    • 创建用来存储text格式数据的sr_hdfs_txt表。
      CREATE TABLE sr_hdfs_txt (
      `c1` int NOT NULL,
      `c4` date NULL,
      `c2` int NOT NULL,
      `c3` String NOT NULL
      )
      ENGINE=OLAP
      UNIQUE KEY(`c1`, `c4`)
      PARTITION BY RANGE(`c4`)
      (
      PARTITION P202204 VALUES [('2022-04-01'), ('2022-05-01')))
      DISTRIBUTED BY HASH(`c1`) BUCKETS 1
      PROPERTIES (
      "dynamic_partition.time_unit" = "MONTH",
      "dynamic_partition.end" = "2"
      );
    • 创建用来存储ORC格式数据的表。
      CREATE TABLE sr_hdfs_orc (
      `c1` int NOT NULL,
      `c4` date NULL,
      `c2` int NOT NULL,
      `c3` String NOT NULL
      )
      ENGINE=OLAP
      UNIQUE KEY(`c1`, `c4`)
      PARTITION BY RANGE(`c4`)
      (
      PARTITION P202204 VALUES [('2022-04-01'), ('2022-05-01')))
      DISTRIBUTED BY HASH(`c1`) BUCKETS 1
      PROPERTIES (
      "dynamic_partition.time_unit" = "MONTH",
      "dynamic_partition.end" = "2"
      );
    • 参数描述。
      • dynamic_partition.time_unit(必选):动态分区的时间粒度,时间粒度会决定动态创建的分区名后缀格式。
        • HOUR:仅支持Datetime类型,动态创建的分区名后缀格式为:yyyyMMddHH,例如:2023032101。
        • DAY时:动态创建的分区名后缀格式为:yyyyMMdd,例如:20230321。
        • WEEK:动态创建的分区名后缀格式为:yyyy_ww,例如:2023_13 ,表示2023年第13周。
        • MONTH:动态创建的分区名后缀格式为:yyyyMM,例如:202303。
        • YEAR:动态创建的分区名后缀格式为:yyyy,例如:2023。
      • dynamic_partition.end(必选):提前创建的分区数量,取值范围为正整数。根据“dynamic_partition.time_unit”属性的不同,以当天/周/月为基准,提前创建对应范围的分区。

  3. 执行以下命令导入数据。

    • 导入text格式数据。
      LOAD LABEL broker_load_2024_02_23
      (
      DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/user/hive/warehouse/test_table/*/*")
      INTO TABLE sr_hdfs_txt
      COLUMNS TERMINATED BY ","
      (c1,c2,c3)
      COLUMNS FROM PATH AS (`c4`)
      SET
      (
      c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
      )
      )
      WITH BROKER "broker_192_168_2_151"
      (
      "username"="hdfs",
      "password"=""
      )
      PROPERTIES
      (
      "timeout"="1200",
      "max_filter_ratio"="0.1"
      );
    • 导入ORC格式数据。
      LOAD LABEL broker_load_2024_03_2
      (
      DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/user/hive/warehouse/test_orc_tbl/*/*")
      INTO TABLE sr_hdfs_orc
      COLUMNS TERMINATED BY ","
      FORMAT AS "orc"
      (c1,c2,c3)
      COLUMNS FROM PATH AS (`c4`)
      SET
      (
      c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
      )
      )
      WITH BROKER "broker_192_168_2_151"
      (
      "username"="hdfs",
      "password"=""
      )
      PROPERTIES
      (
      "timeout"="1200",
      "max_filter_ratio"="0.1"
      );
      表1 数据导入参数描述

      参数

      描述

      LABEL

      导入任务的标识。每个导入任务,都有一个数据库内部唯一的Label。Label是用户在导入命令中自定义的名称。

      hdfs_ip

      HDFS实例IP地址。获取IP地址方法如下:

      MRS中HDFS组件:进入Manager界面,单击“集群 > 服务 > HDFS > 实例”查看。

      hdfs_port

      HDFS实例RPC端口号。

      MRS中HDFS组件:进入Manager界面,单击“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。

      FORMAT AS

      用于指定导入文件的类型,例如:parquet、orc、csv,默认为csv。

      COLUMNS FROM PATH AS

      提取文件路径中的分区字段。

      INTO TABLE

      需要导入数据的StartRocks表。

      broker_192_168_2_151

      表示Broker名称,可在MySQL客户端执行show broker;命令查看。

  4. 执行以下命令查看导入任务的状态信息。

    show load order by createtime desc limit 1\G;

    表2 导入任务返回结果参数描述

    参数

    描述

    JobId

    导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。

    Label

    导入任务的标识。

    State

    • 导入任务当前所处的阶段。
      • PENDING:表示当前导入任务正在等待被执行。
      • LOADING:表示导入任务正在执行中。
      • CANCELLED:表示任务导入失败。
      • FINISHED:表示任务导入成功。

    Progress

    导入任务的进度描述。包括ETL和LOAD,分别对应导入流程的两个阶段ETL和LOADING。

    TaskInfo

    主要显示了当前导入任务参数,即创建Broker Load导入任务时用户指定的参数,包括:clustertimeoutmax-filter-ratio

    ErrorMsg

    如果导入任务状态为“CANCELLED”,会显示失败的原因,包括type和msg两部分。如果导入任务成功则显示“N/A”。

    JobDetails

    显示作业的详细运行状态,包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点ID,未完成的BE节点ID。

  5. 执行以下命令查看表内是否有数据:

    select * from sr_hdfs_txt;

  6. 手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的Label ,操作命令如下。

    CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";

    例如:取消数据库demo中Label为broker_load_2022_03_23的导入作业。

    CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

相关文档