更新时间:2024-09-14 GMT+08:00
分享

Broker Load

Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。本文为您介绍Broker Load导入的基本原理、基本操作、系统配置以及最佳实践。

适用场景

  • 源数据在Broker可以访问的存储系统中,如HDFS、OBS。
  • 数据量在几十到百GB级别。

基本原理

用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。

BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。

+
                 | 1. user create broker load
                 v
            +----+----+
            |         |
            |   FE    |
            |         |
            +----+----+
                 |
                 | 2. BE etl and load the data
    +--------------------------+
    |            |             |
+---v---+     +--v----+    +---v---+
|       |     |       |    |       |
|  BE   |     |  BE   |    |   BE  |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +--+-^--+
    | |           | |         | |
    | |           | |         | | 3. pull data from broker
+---v-+-+     +---v-+-+    +--v-+--+
|       |     |       |    |       |
|Broker |     |Broker |    |Broker |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +---+-^-+
    | |           | |          | |
+---v-+-----------v-+----------v-+-+
|       HDFS/BOS/AFS cluster       |
|                                  |
+----------------------------------+

开始导入

下面我们通过几个实际的场景示例来看Broker Load的使用。

数据样例:

'100','101','102','103','104','105',100.00,100.01,100.02,'100',200,100.08,2022-04-01

'101','102','103','104','105','105',100.00,100.01,100.02,'100',200,100.08,2022-04-02

'102','103','104','105','106','105',100.00,100.01,100.02,'100',200,100.08,2022-04-03

准备工作:

在本地创建示例数据文件source_text.txt,并上传至hdfs的/tmp/。

在hive中创建ods_source表。
CREATE TABLE `ods_source`(
  `id` string, 
  `store_id` string, 
  `company_id` string, 
  `tower_id` string, 
  `commodity_id` string, 
  `commodity_name` string, 
  `commodity_price` double, 
  `member_price` double, 
  `cost_price` double, 
  `unit` string, 
  `quantity` string, 
  `actual_price` double,
  `day ` string
)
row format delimited fields terminated by ','
lines terminated by '\n'
stored as textfile;
将hdfs创建的txt文件导入到ods_source表。
load data inpath '/tmp/source_text.txt' into table ods_source;
  • 示例1,Parquet格式表导入。
    • 在hive中创建parquet分区表并写入数据。
      • 创建ods_demo_detail表。
        CREATE TABLE `ods_demo_detail`(
          `id` string, 
          `store_id` string, 
          `company_id` string, 
          `tower_id` string, 
          `commodity_id` string, 
          `commodity_name` string, 
          `commodity_price` double, 
          `member_price` double, 
          `cost_price` double, 
          `unit` string, 
          `quantity` string, 
          `actual_price` double
        )
        PARTITIONED BY (day string)
        row format delimited fields terminated by ','
        lines terminated by '\n'
        stored as textfile;
      • 把ods_source表中的数据导入到ods_demo_detail表中。
        set hive.exec.dynamic.partition.mode=nonstrict;
        set hive.exec.dynamic.partition=true;
        insert overwrite table ods_demo_detail partition(day) select * from ods_source;
    • 查看Hive表ods_demo_detail有没有数据。
      select * from ods_demo_detail;
    • 在Doris创建数据库。
      create database doris_demo_db;
    • 创建Doris表doris_ods_test_detail。

      如果创建集群的时候不是超高io,则去掉'storage_medium' = 'SSD'。

      use doris_demo_db;
      CREATE TABLE `doris_ods_test_detail` (
        `rq` date NULL,
        `id` varchar(32) NOT NULL,
        `store_id` varchar(32) NULL,
        `company_id` varchar(32) NULL,
        `tower_id` varchar(32) NULL,
        `commodity_id` varchar(32) NULL,
        `commodity_name` varchar(500) NULL,
        `commodity_price` decimal(10, 2) NULL,
        `member_price` decimal(10, 2) NULL,
        `cost_price` decimal(10, 2) NULL,
        `unit` varchar(50) NULL,
        `quantity` int(11) NULL,
        `actual_price` decimal(10, 2) NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`rq`, `id`, `store_id`)
      PARTITION BY RANGE(`rq`)
      (
      PARTITION P_202204 VALUES [('2022-04-01'),('2022-08-30')))
      DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
      PROPERTIES (
      'replication_allocation' = 'tag.location.default: 3',
      'dynamic_partition.enable' = 'true',
      'dynamic_partition.time_unit' = 'MONTH',
      'dynamic_partition.start' = '-2147483648',
      'dynamic_partition.end' = '2',
      'dynamic_partition.prefix' = 'P_',
      'dynamic_partition.buckets' = '1',
      'in_memory' = 'false',
      'storage_format' = 'V2',
      'storage_medium' = 'SSD'
      );
    • 导入数据。
      LOAD LABEL broker_name_test01
      (
          DATA INFILE('hdfs://{hdfs远端ip}:{hdfs远端端口号}/user/hive/warehouse/ods_demo_detail/*/*')
          INTO TABLE doris_ods_test_detail
          COLUMNS TERMINATED BY ','  (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) 
          COLUMNS FROM PATH AS (`day`)
         SET 
         (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
          )
      WITH BROKER 'broker1' 
          ( 
            'username' = 'hdfs', 
            'password' = '' 
          )
      PROPERTIES
      (
          'timeout'='1200',
          'max_filter_ratio'='0.1'
      );
    • 查看导入状态
      您可以通过下面的命令查看上面导入任务的状态信息。
      show load order by createtime desc limit 1\G;
      如果状态信息出现"Scan bytes per file scanner exceed limit: 3221225472",说明导入失败,需要修改参数"max_bytes_per_broker_scanner",请参见Doris参数配置章节的FE节点参数说明表。
      图1 查看数据导入状态
  • 示例2,ORC格式表导入。
    • 在Hive中创建Hive分区表,ORC格式。
      CREATE TABLE `ods_demo_orc_detail`(
        `id` string, 
        `store_id` string, 
        `company_id` string, 
        `tower_id` string, 
        `commodity_id` string, 
        `commodity_name` string, 
        `commodity_price` double, 
        `member_price` double, 
        `cost_price` double, 
        `unit` string, 
        `quantity` double, 
        `actual_price` double
      )
      PARTITIONED BY (day string)
      row format delimited fields terminated by ','
      lines terminated by '\n'
      STORED AS ORC;
    • 查询Source表写入分区表。
      insert overwrite table ods_demo_orc_detail partition(day) select * from ods_source;
    • 创建Doris表。
      CREATE TABLE `doris_ods_orc_detail` (
        `rq` date NULL,
        `id` varchar(32) NOT NULL,
        `store_id` varchar(32) NULL,
        `company_id` varchar(32) NULL,
        `tower_id` varchar(32) NULL,
        `commodity_id` varchar(32) NULL,
        `commodity_name` varchar(500) NULL,
        `commodity_price` decimal(10, 2) NULL,
        `member_price` decimal(10, 2) NULL,
        `cost_price` decimal(10, 2) NULL,
        `unit` varchar(50) NULL,
        `quantity` int(11) NULL,
        `actual_price` decimal(10, 2) NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`rq`, `id`, `store_id`)
      PARTITION BY RANGE(`rq`)
      (
      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-08-30')))
      DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
      PROPERTIES (
      'replication_allocation' = 'tag.location.default: 3',
      'dynamic_partition.enable' = 'true',
      'dynamic_partition.time_unit' = 'MONTH',
      'dynamic_partition.start' = "-2147483648",
      'dynamic_partition.end' = '2',
      'dynamic_partition.prefix' = 'P_',
      'dynamic_partition.buckets' = '1',
      'in_memory' = 'false',
      'storage_format' = 'V2');
    • 导入数据。
      LOAD LABEL orc_2022_02_17
      (
          DATA INFILE("hdfs://{hdfs远端ip}:{hdfs远端端口号}/user/hive/warehouse/ods_demo_orc_detail/*/*")
          INTO TABLE doris_ods_orc_detail
          COLUMNS TERMINATED BY ","
          FORMAT AS 'orc'
      (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) 
          COLUMNS FROM PATH AS (`day`)
         SET 
         (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
          )
      WITH BROKER 'broker1' 
          ( 
            'username' = 'hdfs', 
            'password'= "" 
          )
      PROPERTIES
      (
          'timeout'="1200",
          'max_filter_ratio'="0.1"
      );
    • 查询导入数据。
      show load order by createtime desc limit 1\G;
  • 示例3,OBS格式数据导入。
    • 创建Doris表。
      CREATE TABLE `obs_detail_test` (
        `id` varchar(32) NOT NULL,
        `store_id` varchar(32) NULL,
        `company_id` varchar(32) NULL,
        `tower_id` varchar(32) NULL,
        `commodity_id` varchar(32) NULL,
        `commodity_name` varchar(500) NULL,
        `commodity_price` decimal(10, 2) NULL,
        `member_price` decimal(10, 2) NULL,
        `cost_price` decimal(10, 2) NULL,
        `unit` varchar(50) NULL,
        `quantity` int(11) NULL,
        `actual_price` decimal(10, 2) NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`id`, `store_id`)
      DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
      PROPERTIES (
      'replication_allocation' = 'tag.location.default: 3',
      'in_memory' = 'false',
      'storage_format' = 'V2'
      );
    • 将OBS数据导入到Doris表。

      构造text数据100条,该数据与Doris表字段对应,将数据上传到OBS桶。

      fs.obs.access.key, fs.obs.secret.key,fs.obs.endpoint获取方式如下所示:

      LOAD LABEL label_2023021801
      (
      DATA INFILE("obs://xxx/source_text2.txt")
      INTO TABLE `obs_detail_test`
      COLUMNS TERMINATED BY "," (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
      ) 
      WITH BROKER 'broker1' (
      'fs.obs.access.key' = 'xxx',
      'fs.obs.secret.key' = 'xxxxxx',
      'fs.obs.endpoint' = 'xxxxxx'
      )
      PROPERTIES
      (
      'timeout'="1200",
      'max_filter_ratio'='0.1'
      );
    • 查询数据。
      show load order by createtime desc limit 1\G;
  • 示例4,使用With HDFS方式将HDFS的数据导入Doris表。
    • 创建Doris表。
      CREATE TABLE `ods_dish_detail_test` (
        `id` varchar(32) NOT NULL,
        `store_id` varchar(32) NULL,
        `company_id` varchar(32) NULL,
        `tower_id` varchar(32) NULL,
        `commodity_id` varchar(32) NULL,
        `commodity_name` varchar(500) NULL,
        `commodity_price` decimal(10, 2) NULL,
        `member_price` decimal(10, 2) NULL,
        `cost_price` decimal(10, 2) NULL,
        `unit` varchar(50) NULL,
        `quantity` int(11) NULL,
        `actual_price` decimal(10, 2) NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`id`, `store_id`)
      DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
      PROPERTIES (
      'replication_allocation' = 'tag.location.default: 3',
      'in_memory' = 'false',
      'storage_format' = 'V2'
      ); 
    • 导入数据。

      构造text数据100条,该数据与Doris表字段对应。

      LOAD LABEL label_2023021703
              (
                  DATA INFILE("hdfs://{hdfs远端ip}:{hdfs远端端口号}/tmp/source_text2.txt")
                  INTO TABLE `ods_dish_detail_test`
                  COLUMNS TERMINATED BY "," (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
              ) 
              with HDFS (
                  'fs.defaultFS'="hdfs://{hdfs远端ip}:{hdfs远端端口号}",
                  'hadoop.username'="hdfs",
                  'password'=""
              )
              PROPERTIES
              (
                  'timeout'="1200",
                  'max_filter_ratio'='0.1'
              );
    • 查询数据。
      show load order by createtime desc limit 1\G;

取消导入

当Broker load作业状态不为CANCELLED或FINISHED时,可以被用户手动取消。取消时需要指定待取消导入任务的Label 。

相关系统配置

  • FE配置。

    下面几个配置属于Broker load的系统级别配置,也就是作用于所有Broker load导入任务的配置。主要通过修改FE配置项来调整配置值。

    • max_bytes_per_broker_scanner/max_broker_concurrency

      max_bytes_per_broker_scanner配置限制了单个BE处理的数据量的最大值。max_broker_concurrency配置限制了一个作业的最大的导入并发数。最小处理的数据量(默认64M),最大并发数,源文件的大小和当前集群BE的个数 共同决定了本次导入的并发数。

      本次导入并发数=Math.min(源文件大小/最小处理量(默认64M),最大并发数,当前BE节点个数)。
      本次导入单个BE的处理量=源文件大小/本次导入的并发数。

      通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner*BE节点数。如果需要导入更大数据量,则需要适当调整max_bytes_per_broker_scanner参数的大小。

      默认配置:

      • 参数名:max_broker_concurrency, 默认10。
      • 参数名:max_bytes_per_broker_scanner,默认3G,单位bytes。

最佳实践

  • 应用场景。

    使用Broker load最适合的场景就是原始数据在文件系统(HDFS,BOS,AFS)中的场景。其次,由于Broker load是单次导入中唯一的一种异步导入的方式,所以如果用户在导入大文件中,需要使用异步接入,也可以考虑使用Broker load。

  • 数据量。

    这里仅讨论单个BE的情况,如果用户集群有多个BE则下面标题中的数据量应该乘以BE个数来计算。比如:如果用户有3个BE,则3G以下(包含)则应该乘以3,也就是9G以下(包含)。

    • 3G以下(包含):用户可以直接提交Broker load创建导入请求。
    • 3G以上:由于单个导入BE最大的处理量为3G,超过3G的待导入文件就需要通过调整Broker load的导入参数来实现大文件的导入。
      • 根据当前BE的个数和原始文件的大小修改单个BE的最大扫描量和最大并发数。
        修改fe配置项。
        max_broker_concurrency=BE个数。
        当前导入任务单个BE处理的数据量=原始文件大小/max_broker_concurrency。
        max_bytes_per_broker_scanner >=当前导入任务单个BE处理的数据量。
        比如一个100G的文件,集群的BE个数为10个。
        max_broker_concurrency=10。
        max_bytes_per_broker_scanner >=10G=100G/10。

        修改后,所有的BE会并发的处理导入任务,每个BE处理原始文件的一部分。

        上述两个FE中的配置均为系统配置,也就是说其修改是作用于所有的Broker load的任务的。

      • 在创建导入的时候自定义当前导入任务的timeout时间。

        这时候不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。

        可以通过如下公式计算出Doris集群期望最大导入文件数据量:

        期望最大导入文件数据量=14400s*10M/s*BE个数。
        比如:集群的BE个数为10个。
        期望最大导入文件数据量=14400s*10M/s*10 =1440000M≈1440G。

        一般用户的环境可能达不到10M/s的速度,所以建议超过500G的文件都进行文件切分,再导入。

作业调度

系统会限制一个集群内正在运行的Broker Load作业数量,以防止同时运行过多的Load作业。

首先,FE的配置参数:desired_max_waiting_jobs会限制一个集群内未开始或正在运行(作业状态为PENDING或LOADING)的Broker Load作业数量。默认为100。如果超过这个阈值,新提交的作业将会被直接拒绝。

一个Broker Load作业会被分为pending task和loading task阶段。其中pending task负责获取导入文件的信息,而loading task会发送给BE执行具体的导入任务。

FE的配置参数async_pending_load_task_pool_size用于限制同时运行的pending task的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入LOADING状态开始执行,而其他作业处于PENDING等待状态。

FE的配置参数async_loading_load_task_pool_size用于限制同时运行的loading task的任务数量。一个Broker Load作业会有1 pending task和多个loading task(等于LOAD语句中DATA INFILE子句的个数)。所以async_loading_load_task_pool_size应该大于等于async_pending_load_task_pool_size。

性能分析

可以在提交LOAD作业前,先执行set enable_profile=true打开会话变量。然后提交导入作业。待导入作业完成后,可以在FE的web页面的Queris标签中查看到导入作业的Profile。

相关文档