Broker Load
Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。本文为您介绍Broker Load导入的基本原理、基本操作、系统配置以及最佳实践。
适用场景
- 源数据在Broker可以访问的存储系统中,如HDFS、OBS。
- 数据量在几十到百GB级别。
基本原理
用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。
BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。
+ | 1. user create broker load v +----+----+ | | | FE | | | +----+----+ | | 2. BE etl and load the data +--------------------------+ | | | +---v---+ +--v----+ +---v---+ | | | | | | | BE | | BE | | BE | | | | | | | +---+-^-+ +---+-^-+ +--+-^--+ | | | | | | | | | | | | 3. pull data from broker +---v-+-+ +---v-+-+ +--v-+--+ | | | | | | |Broker | |Broker | |Broker | | | | | | | +---+-^-+ +---+-^-+ +---+-^-+ | | | | | | +---v-+-----------v-+----------v-+-+ | HDFS/BOS/AFS cluster | | | +----------------------------------+
开始导入
下面我们通过几个实际的场景示例来看Broker Load的使用。
数据样例:
'100','101','102','103','104','105',100.00,100.01,100.02,'100',200,100.08,2022-04-01
'101','102','103','104','105','105',100.00,100.01,100.02,'100',200,100.08,2022-04-02
'102','103','104','105','106','105',100.00,100.01,100.02,'100',200,100.08,2022-04-03
准备工作:
在本地创建示例数据文件source_text.txt,并上传至hdfs的/tmp/。
CREATE TABLE `ods_source`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` string, `actual_price` double, `day ` string ) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile;
load data inpath '/tmp/source_text.txt' into table ods_source;
- 示例1,Parquet格式表导入。
- 在hive中创建parquet分区表并写入数据。
- 创建ods_demo_detail表。
CREATE TABLE `ods_demo_detail`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` string, `actual_price` double ) PARTITIONED BY (day string) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile;
- 把ods_source表中的数据导入到ods_demo_detail表中。
set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.dynamic.partition=true; insert overwrite table ods_demo_detail partition(day) select * from ods_source;
- 创建ods_demo_detail表。
- 查看Hive表ods_demo_detail有没有数据。
select * from ods_demo_detail;
- 在Doris创建数据库。
create database doris_demo_db;
- 创建Doris表doris_ods_test_detail。
如果创建集群的时候不是超高io,则去掉'storage_medium' = 'SSD'。
use doris_demo_db; CREATE TABLE `doris_ods_test_detail` ( `rq` date NULL, `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL ) ENGINE=OLAP UNIQUE KEY(`rq`, `id`, `store_id`) PARTITION BY RANGE(`rq`) ( PARTITION P_202204 VALUES [('2022-04-01'),('2022-08-30'))) DISTRIBUTED BY HASH(`store_id`) BUCKETS 1 PROPERTIES ( 'replication_allocation' = 'tag.location.default: 3', 'dynamic_partition.enable' = 'true', 'dynamic_partition.time_unit' = 'MONTH', 'dynamic_partition.start' = '-2147483648', 'dynamic_partition.end' = '2', 'dynamic_partition.prefix' = 'P_', 'dynamic_partition.buckets' = '1', 'in_memory' = 'false', 'storage_format' = 'V2', 'storage_medium' = 'SSD' );
- 导入数据。
LOAD LABEL broker_name_test01 ( DATA INFILE('hdfs://{hdfs远端ip}:{hdfs远端端口号}/user/hive/warehouse/ods_demo_detail/*/*') INTO TABLE doris_ods_test_detail COLUMNS TERMINATED BY ',' (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) COLUMNS FROM PATH AS (`day`) SET (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price) ) WITH BROKER 'broker1' ( 'username' = 'hdfs', 'password' = '' ) PROPERTIES ( 'timeout'='1200', 'max_filter_ratio'='0.1' );
- 查看导入状态
如果状态信息出现"Scan bytes per file scanner exceed limit: 3221225472",说明导入失败,需要修改参数"max_bytes_per_broker_scanner",请参见Doris参数配置章节的FE节点参数说明表。图1 查看数据导入状态
- 在hive中创建parquet分区表并写入数据。
- 示例2,ORC格式表导入。
- 在Hive中创建Hive分区表,ORC格式。
CREATE TABLE `ods_demo_orc_detail`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` double, `actual_price` double ) PARTITIONED BY (day string) row format delimited fields terminated by ',' lines terminated by '\n' STORED AS ORC;
- 查询Source表写入分区表。
insert overwrite table ods_demo_orc_detail partition(day) select * from ods_source;
- 创建Doris表。
CREATE TABLE `doris_ods_orc_detail` ( `rq` date NULL, `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL ) ENGINE=OLAP UNIQUE KEY(`rq`, `id`, `store_id`) PARTITION BY RANGE(`rq`) ( PARTITION P_202204 VALUES [('2022-04-01'), ('2022-08-30'))) DISTRIBUTED BY HASH(`store_id`) BUCKETS 1 PROPERTIES ( 'replication_allocation' = 'tag.location.default: 3', 'dynamic_partition.enable' = 'true', 'dynamic_partition.time_unit' = 'MONTH', 'dynamic_partition.start' = "-2147483648", 'dynamic_partition.end' = '2', 'dynamic_partition.prefix' = 'P_', 'dynamic_partition.buckets' = '1', 'in_memory' = 'false', 'storage_format' = 'V2');
- 导入数据。
LOAD LABEL orc_2022_02_17 ( DATA INFILE("hdfs://{hdfs远端ip}:{hdfs远端端口号}/user/hive/warehouse/ods_demo_orc_detail/*/*") INTO TABLE doris_ods_orc_detail COLUMNS TERMINATED BY "," FORMAT AS 'orc' (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) COLUMNS FROM PATH AS (`day`) SET (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price) ) WITH BROKER 'broker1' ( 'username' = 'hdfs', 'password'= "" ) PROPERTIES ( 'timeout'="1200", 'max_filter_ratio'="0.1" );
- 查询导入数据。
show load order by createtime desc limit 1\G;
- 在Hive中创建Hive分区表,ORC格式。
- 示例3,OBS格式数据导入。
- 创建Doris表。
CREATE TABLE `obs_detail_test` ( `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL ) ENGINE=OLAP UNIQUE KEY(`id`, `store_id`) DISTRIBUTED BY HASH(`store_id`) BUCKETS 1 PROPERTIES ( 'replication_allocation' = 'tag.location.default: 3', 'in_memory' = 'false', 'storage_format' = 'V2' );
- 将OBS数据导入到Doris表。
构造text数据100条,该数据与Doris表字段对应,将数据上传到OBS桶。
fs.obs.access.key, fs.obs.secret.key,fs.obs.endpoint获取方式如下所示:
- fs.obs.access.key与fs.obs.secret.key的获取方式请参见对象存储服务的获取访问密钥(AK/SK)章节。
- fs.obs.endpoint的获取方式见请参见对象存储服务的获取终端节点章节。
LOAD LABEL label_2023021801 ( DATA INFILE("obs://xxx/source_text2.txt") INTO TABLE `obs_detail_test` COLUMNS TERMINATED BY "," (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) ) WITH BROKER 'broker1' ( 'fs.obs.access.key' = 'xxx', 'fs.obs.secret.key' = 'xxxxxx', 'fs.obs.endpoint' = 'xxxxxx' ) PROPERTIES ( 'timeout'="1200", 'max_filter_ratio'='0.1' );
- 查询数据。
show load order by createtime desc limit 1\G;
- 创建Doris表。
- 示例4,使用With HDFS方式将HDFS的数据导入Doris表。
- 创建Doris表。
CREATE TABLE `ods_dish_detail_test` ( `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL ) ENGINE=OLAP UNIQUE KEY(`id`, `store_id`) DISTRIBUTED BY HASH(`store_id`) BUCKETS 1 PROPERTIES ( 'replication_allocation' = 'tag.location.default: 3', 'in_memory' = 'false', 'storage_format' = 'V2' );
- 导入数据。
LOAD LABEL label_2023021703 ( DATA INFILE("hdfs://{hdfs远端ip}:{hdfs远端端口号}/tmp/source_text2.txt") INTO TABLE `ods_dish_detail_test` COLUMNS TERMINATED BY "," (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) ) with HDFS ( 'fs.defaultFS'="hdfs://{hdfs远端ip}:{hdfs远端端口号}", 'hadoop.username'="hdfs", 'password'="" ) PROPERTIES ( 'timeout'="1200", 'max_filter_ratio'='0.1' );
- 查询数据。
show load order by createtime desc limit 1\G;
- 创建Doris表。
取消导入
当Broker load作业状态不为CANCELLED或FINISHED时,可以被用户手动取消。取消时需要指定待取消导入任务的Label 。
相关系统配置
- FE配置。
下面几个配置属于Broker load的系统级别配置,也就是作用于所有Broker load导入任务的配置。主要通过修改FE配置项来调整配置值。
- max_bytes_per_broker_scanner/max_broker_concurrency
max_bytes_per_broker_scanner配置限制了单个BE处理的数据量的最大值。max_broker_concurrency配置限制了一个作业的最大的导入并发数。最小处理的数据量(默认64M),最大并发数,源文件的大小和当前集群BE的个数 共同决定了本次导入的并发数。
本次导入并发数=Math.min(源文件大小/最小处理量(默认64M),最大并发数,当前BE节点个数)。 本次导入单个BE的处理量=源文件大小/本次导入的并发数。
通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner*BE节点数。如果需要导入更大数据量,则需要适当调整max_bytes_per_broker_scanner参数的大小。
默认配置:
- 参数名:max_broker_concurrency, 默认10。
- 参数名:max_bytes_per_broker_scanner,默认3G,单位bytes。
- max_bytes_per_broker_scanner/max_broker_concurrency
最佳实践
- 应用场景。
使用Broker load最适合的场景就是原始数据在文件系统(HDFS,BOS,AFS)中的场景。其次,由于Broker load是单次导入中唯一的一种异步导入的方式,所以如果用户在导入大文件中,需要使用异步接入,也可以考虑使用Broker load。
- 数据量。
这里仅讨论单个BE的情况,如果用户集群有多个BE则下面标题中的数据量应该乘以BE个数来计算。比如:如果用户有3个BE,则3G以下(包含)则应该乘以3,也就是9G以下(包含)。
- 3G以下(包含):用户可以直接提交Broker load创建导入请求。
- 3G以上:由于单个导入BE最大的处理量为3G,超过3G的待导入文件就需要通过调整Broker load的导入参数来实现大文件的导入。
- 根据当前BE的个数和原始文件的大小修改单个BE的最大扫描量和最大并发数。
修改fe配置项。 max_broker_concurrency=BE个数。 当前导入任务单个BE处理的数据量=原始文件大小/max_broker_concurrency。 max_bytes_per_broker_scanner >=当前导入任务单个BE处理的数据量。 比如一个100G的文件,集群的BE个数为10个。 max_broker_concurrency=10。 max_bytes_per_broker_scanner >=10G=100G/10。
修改后,所有的BE会并发的处理导入任务,每个BE处理原始文件的一部分。
上述两个FE中的配置均为系统配置,也就是说其修改是作用于所有的Broker load的任务的。
- 在创建导入的时候自定义当前导入任务的timeout时间。
这时候不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。
可以通过如下公式计算出Doris集群期望最大导入文件数据量:
期望最大导入文件数据量=14400s*10M/s*BE个数。 比如:集群的BE个数为10个。 期望最大导入文件数据量=14400s*10M/s*10 =1440000M≈1440G。
一般用户的环境可能达不到10M/s的速度,所以建议超过500G的文件都进行文件切分,再导入。
- 根据当前BE的个数和原始文件的大小修改单个BE的最大扫描量和最大并发数。
作业调度
系统会限制一个集群内正在运行的Broker Load作业数量,以防止同时运行过多的Load作业。
首先,FE的配置参数:desired_max_waiting_jobs会限制一个集群内未开始或正在运行(作业状态为PENDING或LOADING)的Broker Load作业数量。默认为100。如果超过这个阈值,新提交的作业将会被直接拒绝。
一个Broker Load作业会被分为pending task和loading task阶段。其中pending task负责获取导入文件的信息,而loading task会发送给BE执行具体的导入任务。
FE的配置参数async_pending_load_task_pool_size用于限制同时运行的pending task的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入LOADING状态开始执行,而其他作业处于PENDING等待状态。
FE的配置参数async_loading_load_task_pool_size用于限制同时运行的loading task的任务数量。一个Broker Load作业会有1 pending task和多个loading task(等于LOAD语句中DATA INFILE子句的个数)。所以async_loading_load_task_pool_size应该大于等于async_pending_load_task_pool_size。
性能分析
可以在提交LOAD作业前,先执行set enable_profile=true打开会话变量。然后提交导入作业。待导入作业完成后,可以在FE的web页面的Queris标签中查看到导入作业的Profile。