使用Broker Load方式导入数据至Doris
Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。
Doris表中的数据是有序的,Broker Load在导入数据时要利用Doris集群资源对数据进行排序,相对于Spark Load来完成海量历史数据迁移,对Doris的集群资源占用比较大。Broker Load方式是在用户没有Spark计算资源的情况下使用,如果有Spark计算资源建议使用Spark Load。
用户需要通过MySQL协议创建Broker Load导入,并通过查看导入命令检查导入结果。适用以下场景:
- 源数据在Broker可以访问的存储系统中,如HDFS。
- 数据量在几十到百GB级别。
- 支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。
前提条件
- 已创建包含Doris服务的集群,集群内各服务运行正常。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。
- Doris中已安装并启动DBroker实例。
- 已安装Hive客户端。
- 如果Doris通过Broker Load跨集群导入数据,需要配置跨集群互信,相关操作可参考配置跨Manager集群互信。
导入Hive表数据到Doris中
执行Broker Load导入任务前,可根据实际需求修改相关参数配置中的参数,以配置合理的导入任务并发数及导入作业支持的最大数量。
- 导入Text格式的Hive表数据到Doris中
- 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:
cd 客户端安装目录
加载环境变量
source bigdata_env
认证用户,如果集群未启用Kerberos认证(普通模式),请跳过该操作:
kinit 组件业务用户
登录Hive客户端:
beeline
- 执行以下命令在default库创建Hive表,分区字段为“c4”:
CREATE TABLE test_table( `c1` int, `c2` int, `c3` string) PARTITIONED BY (c4 string) row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;
- 执行以下命令插入数据到Hive表中:
insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
如果集群已启用Kerberos认证(安全模式),需先执行以下命令:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
连接Doris数据库:
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 如果Hive组件和Doris组件是跨集群部署,需要修改以下配置:
- Doris所在集群的Doris的“hadoop.rpc.protection”配置项的值需与Hive所在集群中的HDFS组件的该配置项的值保持一致。
- 需修改Doris所在集群的DBroker的“BROKER_GC_OPTS”配置项的“-Djava.security.krb5.conf”参数,值为拷贝Hive所在集群的任一HiveServer实例节点的“$BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf” 文件到Doris所在集群的任一DBroker的任意目录。
- 执行以下命令创建Doris表:
CREATE TABLE example_db.test_t1 ( `c1` int NOT NULL, `c4` date NULL, `c2` int NOT NULL, `c3` String NOT NULL ) ENGINE=OLAP UNIQUE KEY(`c1`, `c4`) PARTITION BY RANGE(`c4`) ( PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01'))) DISTRIBUTED BY HASH(`c1`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "MONTH", "dynamic_partition.start" = "-2147483648", "dynamic_partition.end" = "2", "dynamic_partition.prefix" = "P_", "dynamic_partition.buckets" = "1", "in_memory" = "false", "storage_format" = "V2" );
- 执行以下命令导入数据:
- 集群已启用Kerberos认证(安全模式)
LOAD LABEL broker_load_2022_03_23 ( DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*") INTO TABLE test_t1 COLUMNS TERMINATED BY "," (c1,c2,c3) COLUMNS FROM PATH AS (`c4`) SET ( c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3 ) ) WITH BROKER "broker_192_168_67_78" ( "hadoop.security.authentication"="kerberos", "kerberos_principal"="${doris_keytab_principal}", "kerberos_keytab"="/home/omm/doris_keytab/doris.keytab", "dfs.nameservices" = "hacluster", "dfs.ha.namenodes.hacluster" = "37,36", "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号", "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号", "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
- 集群未启用Kerberos认证(普通模式)
LOAD LABEL broker_load_2022_03_23 ( DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*") INTO TABLE test_t1 COLUMNS TERMINATED BY "," (c1,c2,c3) COLUMNS FROM PATH AS (`c4`) SET ( c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3 ) ) WITH BROKER "broker_192_168_67_78" ( "username"="hdfs", "password"="", "dfs.nameservices" = "hacluster", "dfs.ha.namenodes.hacluster" = "37,36", "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号", "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号", "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
其中:
- 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
- RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
- broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
- 其他参数介绍请参见表1。
表1 Broker Load命令参数介绍 参数
参数说明
kerberos_principal
访问Hadoop集群的keytab文件对应的principal。
- MRS 3.5.0之前版本,参数值为“doris/hadoop.hadoop.com@HADOOP.COM”。
- MRS 3.5.0及之后版本,格式为“doris/hadoop.${系统域名转换为小写}@${系统域名}”,系统域名可登录Manager页面,选择“系统 > 权限 > 域和互信”,查看“本端域”参数获取。例如,查看到的系统域名为“A39A7DF8_953D_4772_B909_035A594FFA55.COM”,则该参数值为“doris/hadoop.a39a7df8_953d_4772_b909_035a594ffa55.com@A39A7DF8_953D_4772_B909_035A594FFA55.COM”。
kerberos_keytab
访问Hadoop集群的keytab文件,该keytab位于FE节点的“${BIGDATA_HOME}/FusionInsight_Doris_*/install/FusionInsight-Doris-*/doris-be/bin/doris.keytab”路径中,需要拷贝该keytab文件到所有Broker节点上,例如“/home/omm/doris_keytab”目录下,并执行以下命令设置“doris.keytab”文件属组:
chown omm:wheel /home/omm/doris_keytab -R
dfs.nameservices
集群NameService名称。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。
dfs.ha.namenodes.hacluster
集群NameService前缀,包含两个值。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。
dfs.client.failover.proxy.provider.hacluster
指定HDFS客户端连接集群中Active状态节点的Java类,值为“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”。
- 集群已启用Kerberos认证(安全模式)
- 执行以下命令查看导入任务的状态信息:
show load order by createtime desc limit 1\G;
命令回显信息如下,表示导入任务执行成功:
JobId: 41326624 Label: broker_load_2022_03_23 State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27 TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1 ErrorMsg: NULL CreateTime: 2022-04-01 18:59:06 EtlStartTime: 2022-04-01 18:59:11 EtlFinishTime: 2022-04-01 18:59:11 LoadStartTime: 2022-04-01 18:59:11 LoadFinishTime: 2022-04-01 18:59:11 URL: NULL JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540} 1 row in set (0.01 sec)
- 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的Label ,命令为:
CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";
例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
- 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:
- 导入ORC格式的Hive表数据到Doris中
- 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:
cd 客户端安装目录
加载环境变量
source bigdata_env
认证用户,如果集群未启用Kerberos认证(普通模式),请跳过该操作:
kinit 组件业务用户
登录Hive客户端:
beeline
- 执行以下命令在default库创建ORC格式的Hive表:
CREATE TABLE test_orc_tbl( `c1` int, `c2` int, `c3` string) PARTITIONED BY (c4 string) row format delimited fields terminated by ','lines terminated by '\n' stored as orc;
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
如果集群已启用Kerberos认证(安全模式),需先执行以下命令:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
连接Doris数据库:
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 如果Hive组件和Doris组件是跨集群部署,需要修改以下配置:
- Doris所在集群的Doris的“hadoop.rpc.protection”配置项的值需与Hive所在集群中的HDFS组件的该配置项的值保持一致。
- 需修改Doris所在集群的DBroker的“BROKER_GC_OPTS”配置项的“-Djava.security.krb5.conf”参数,值为拷贝Hive所在集群的任一HiveServer实例节点的“$BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf” 文件到Doris所在集群的任一DBroker的任意目录。
- 执行以下命令创建Doris表:
CREATE TABLE example_db.test_orc_t1 ( `c1` int NOT NULL, `c4` date NULL, `c2` int NOT NULL, `c3` String NOT NULL ) ENGINE=OLAP UNIQUE KEY(`c1`, `c4`) PARTITION BY RANGE(`c4`) ( PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01'))) DISTRIBUTED BY HASH(`c1`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "MONTH", "dynamic_partition.start" = "-2147483648", "dynamic_partition.end" = "2", "dynamic_partition.prefix" = "P_", "dynamic_partition.buckets" = "1", "in_memory" = "false", "storage_format" = "V2" );
- 执行以下命令使用Broker Load导入数据:
- 集群已启用Kerberos认证(安全模式):
LOAD LABEL broker_load_2022_03_24 ( DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*") INTO TABLE test_orc_t1 FORMAT AS "orc" (c1,c2,c3) COLUMNS FROM PATH AS (`c4`) SET ( c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3 ) ) WITH BROKER "broker_192_168_67_78" ( "hadoop.security.authentication"="kerberos", "kerberos_principal"="${doris_keytab_principal}", "kerberos_keytab"="/home/omm/doris_keytab/doris.keytab", "dfs.nameservices" = "hacluster", "dfs.ha.namenodes.hacluster" = "37,36", "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号", "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号", "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
- 集群未启用Kerberos认证(普通模式)
LOAD LABEL broker_load_2022_03_24 ( DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*") INTO TABLE test_orc_t1PROVIDOR FORMAT AS "orc" (c1,c2,c3) COLUMNS FROM PATH AS (`c4`) SET ( c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3 ) ) WITH BROKER "broker_192_168_67_78" ( 'username"="hdfs", 'password"="", "dfs.nameservices" = "hacluster", "dfs.ha.namenodes.hacluster" = "37,36", "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号", "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号", "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
其中:
- FORMAT AS "orc" :已指定待导入的数据格式为ORC。
- SET:定义Hive表和Doris表之间的字段映射关系及字段转换的规则。
- 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
- RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
- broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
- 其他参数介绍请参见表2。
表2 Broker Load命令参数介绍 参数
参数说明
kerberos_principal
访问Hadoop集群的keytab文件对应的principal。
- MRS 3.5.0之前版本,参数值为“doris/hadoop.hadoop.com@HADOOP.COM”。
- MRS 3.5.0及之后版本,格式为“doris/hadoop.${系统域名转换为小写}@${系统域名}”,系统域名可登录Manager页面,选择“系统 > 权限 > 域和互信”,查看“本端域”参数获取。例如,查看到的系统域名为“A39A7DF8_953D_4772_B909_035A594FFA55.COM”,则该参数值为“doris/hadoop.a39a7df8_953d_4772_b909_035a594ffa55.com@A39A7DF8_953D_4772_B909_035A594FFA55.COM”。
kerberos_keytab
访问Hadoop集群的keytab文件,该keytab位于FE节点的“${BIGDATA_HOME}/FusionInsight_Doris_*/install/FusionInsight-Doris-*/doris-be/bin/doris.keytab”路径中,需要拷贝该keytab文件到所有Broker节点上,例如“/home/omm/doris_keytab”目录下,并执行以下命令设置“doris.keytab”文件属组:
chown omm:wheel /home/omm/doris_keytab -R
dfs.nameservices
集群NameService名称。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。
dfs.ha.namenodes.hacluster
集群NameService前缀,包含两个值。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。
dfs.client.failover.proxy.provider.hacluster
指定HDFS客户端连接集群中Active状态节点的Java类,值为“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”。
- 集群已启用Kerberos认证(安全模式):
- 执行以下命令查看导入任务的状态信息:
show load order by createtime desc limit 1\G;
- 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的 Label ,命令为:
CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";
例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
- 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:
相关参数配置
以下配置属于Broker Load的系统级别配置,作用于所有Broker Load导入任务。
登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增以下参数:
- min_bytes_per_broker_scanner:用于限制单个BE处理的数据量的最小值,默认值为:64MB,单位为:bytes。
- max_bytes_per_broker_scanner:用于限制单个BE处理的数据量的最大值,默认值为:3G,单位为:bytes。
- max_broker_concurrency:用于限制一个作业的最大的导入并发数,默认值为:10。
最小处理的数据量、最大并发数、源文件的大小和当前集群BE节点的个数共同决定了本次任务导入的并发数:
- 本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
- 本次导入单个BE的处理量 = 源文件大小/本次导入的并发数
通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner * BE节点数。如果需要导入更大数据量,则需要适当调整“max_bytes_per_broker_scanner”参数的大小。