Broker Load
Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。
Doris表中的数据是有序的,Broker Load在导入数据时要利用Doris集群资源对数据进行排序,相对于Spark Load来完成海量历史数据迁移,对Doris的集群资源占用比较大。Broker Load方式是在用户没有Spark计算资源的情况下使用,如果有Spark计算资源建议使用Spark Load。
用户需要通过MySQL协议创建Broker Load 导入,并通过查看导入命令检查导入结果。适用以下场景:
- 源数据在Broker可以访问的存储系统中,如HDFS。
- 数据量在几十到百GB级别。
- 支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。
前提条件
- 已创建包含Doris服务的集群,集群内各服务运行正常。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考安装MySQL客户端。
- Doris中已安装并启动DBroker实例。
- 已安装Hive客户端。
导入Hive表数据到Doris中
- 导入Text格式的Hive表数据到Doris中
- 执行以下命令登录Hive beeline命令行:
source bigdata_env
kinit 组件业务用户(若集群未启用Kerberos认证(普通模式)请跳过该操作)
beeline
- 执行以下命令在default库创建Hive表,分区字段为“c4”:
CREATE TABLE test_table(
`c1` int,
`c2` int,
`c3` string)
PARTITIONED BY (c4 string)
row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;
- 执行以下命令插入数据到Hive表中:
insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p数据库登录用户密码 -PFE查询连接端口 -hDoris FE实例IP地址
- Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 执行以下命令创建Doris表:
CREATE TABLE example_db.test_t1 (
`c1` int NOT NULL,
`c4` date NULL,
`c2` int NOT NULL,
`c3` String NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`c1`, `c4`)
PARTITION BY RANGE(`c4`)
(
PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
DISTRIBUTED BY HASH(`c1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.buckets" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
- 执行以下命令导入数据:
- 集群已启用Kerberos认证(安全模式)
LOAD LABEL broker_load_2022_03_23
(
DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*")
INTO TABLE test_t1
COLUMNS TERMINATED BY ","
(c1,c2,c3)
COLUMNS FROM PATH AS (`c4`)
SET
(
c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
)
)
WITH BROKER "broker_192_168_67_78"
(
"hadoop.security.authentication"="kerberos",
"kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",
"kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.1/install/FusionInsight-Doris-2.0.3/doris-fe/bin/doris.keytab"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
- 集群未启用Kerberos认证(普通模式)
LOAD LABEL broker_load_2022_03_23
(
DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*")
INTO TABLE test_t1
COLUMNS TERMINATED BY ","
(c1,c2,c3)
COLUMNS FROM PATH AS (`c4`)
SET
(
c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
)
)
WITH BROKER "broker_192_168_67_78"
(
"username"="hdfs",
"password"=""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
- 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
- RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
- broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
- 命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
- 集群已启用Kerberos认证(安全模式)
- 执行以下命令查看导入任务的状态信息:
show load order by createtime desc limit 1\G;
JobId: 41326624 Label: broker_load_2022_03_23 State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27 TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1 ErrorMsg: NULL CreateTime: 2022-04-01 18:59:06 EtlStartTime: 2022-04-01 18:59:11 EtlFinishTime: 2022-04-01 18:59:11 LoadStartTime: 2022-04-01 18:59:11 LoadFinishTime: 2022-04-01 18:59:11 URL: NULL JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540} 1 row in set (0.01 sec)
- 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的 Label ,命令为:
CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";
例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
- 执行以下命令登录Hive beeline命令行:
- 导入ORC格式的Hive表数据到Doris中
- 执行以下命令登录Hive beeline命令行:
source bigdata_env
kinit 组件业务用户(若集群未启用Kerberos认证(普通模式)请跳过该操作)
beeline
- 执行以下命令在default库创建ORC格式的Hive表:
CREATE TABLE test_orc_tbl(
`c1` int,
`c2` int,
`c3` string)
PARTITIONED BY (c4 string)
row format delimited fields terminated by ','lines terminated by '\n' stored as orc;
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p数据库登录用户密码 -PFE查询连接端口 -hDoris FE实例IP地址
- Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 执行以下命令创建Doris表:
CREATE TABLE example_db.test_orc_t1 (
`c1` int NOT NULL,
`c4` date NULL,
`c2` int NOT NULL,
`c3` String NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`c1`, `c4`)
PARTITION BY RANGE(`c4`)
(
PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
DISTRIBUTED BY HASH(`c1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.buckets" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
- 执行以下命令使用Broker Load 导入数据:
- 集群已启用Kerberos认证(安全模式):
LOAD LABEL broker_load_2022_03_24
(
DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*")
INTO TABLE test_orc_t1
COLUMNS TERMINATED BY ","
FORMAT AS "orc"
(c1,c2,c3)
COLUMNS FROM PATH AS (`c4`)
SET
(
c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
)
)
WITH BROKER "broker_192_168_67_78"
(
"hadoop.security.authentication"="kerberos",
"kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",
"kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.1/install/FusionInsight-Doris-2.0.3/doris-fe/bin/doris.keytab"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
- 集群未启用Kerberos认证(普通模式)
LOAD LABEL broker_load_2022_03_24
(
DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*")
INTO TABLE test_orc_t1
COLUMNS TERMINATED BY ","
FORMAT AS "orc"
(c1,c2,c3)
COLUMNS FROM PATH AS (`c4`)
SET
(
c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
)
)
WITH BROKER "broker_192_168_67_78"
(
'username"="hdfs",
'password"=""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
- FORMAT AS "orc" :已指定待导入的数据格式为ORC。
- SET:定义 Hive 表和 Doris 表之间的字段映射关系及字段转换的规则。
- 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
- RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
- broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
- 集群已启用Kerberos认证(安全模式):
- 执行以下命令查看导入任务的状态信息:
- 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的 Label ,命令为:
CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";
例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
- 执行以下命令登录Hive beeline命令行:
相关参数配置
以下配置属于Broker Load的系统级别配置,作用于所有Broker Load导入任务。
登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增以下参数:
- min_bytes_per_broker_scanner:用于限制了单个BE处理的数据量的最小值,默认值为:64MB,单位为:bytes。
- max_bytes_per_broker_scanner:用于限制了单个BE处理的数据量的最大值,默认值为:3G,单位为:bytes。
- max_broker_concurrency:用于限制一个作业的最大的导入并发数,默认值为:10。
最小处理的数据量,最大并发数,源文件的大小和当前集群BE节点的个数共同决定了本次任务导入的并发数:
- 本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
- 本次导入单个BE的处理量 = 源文件大小/本次导入的并发数
通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner * BE节点数。如果需要导入更大数据量,则需要适当调整“max_bytes_per_broker_scanner”参数的大小。