使用Broker Load方式导入OBS数据至Doris
Stream Load导入Doris数据需依赖客户端读取,再推送到Doris。Broker Load则是将导入请求发送给Doris,由Doris主动拉取数据,因此如果要导入的数据存储在对象存储中,使用Broker Load是最便捷的。使用Broker Load方式,数据就不需要经过客户端,而由Doris直接读取导入。
用户需要通过MySQL协议创建Broker Load导入,并通过查看导入命令检查导入结果。适用以下场景:
- 源数据在Broker可以访问的存储系统中,如OBS。
- 数据量在几十到百GB级别。
- 支持导入CSV、Parquet、ORC、JSON格式的数据,默认支持导入CSV格式数据。
该章节操作适用于MRS 3.5.0及之后版本。
前提条件
- 已创建包含Doris服务的集群,集群内各服务运行正常。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。
- 已准备待导入至Doris的数据文件。
创建OBS并行文件系统并获取AK/SK
创建OBS并行文件系统。
- 登录OBS管理控制台。
- 选择“并行文件系统 > 创建并行文件系统”。
- 填写文件系统名称,例如“doris-obs”。
企业项目需要与MRS集群保持一致,其他参数请根据需要填写。
- 单击“立即创建”。
- 在并行文件系统列表中单击新建的并行文件系统名称,单击“概览”,查看并记录“Endpoint”信息。
- 在左侧导航栏选择“文件”,单击“新建文件夹”,填写待创建的文件夹名称,例如“test”,单击“确定”。
- 单击新建的文件夹名称,单击“上传文件”,上传待导入的数据至该路径下,例如上传的文件为“test_data.csv”。
获取AK/SK信息。
导入OBS数据至Doris表中
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p -PFE查询连接端口 -hDoris FE实例IP地址
执行命令后输入数据库登录用户密码。
- Doris FE的查询连接端口,可以通过登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 执行以下命令创建数据库:
create database test_broker_load;
use test_broker_load;
- 执行以下命令创建表并导入OBS数据至表中:
CREATE TABLE IF NOT EXISTS test
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`gender` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `gender`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_label001
(
DATA INFILE("obs://并行文件系统名称/test/test_data.csv")
INTO TABLE `test`
COLUMNS TERMINATED BY ','
FORMAT AS "csv"
)
WITH BROKER "broker1"
(
"fs.obs.access.key" = "xxx",
"fs.obs.secret.key" = "xxx",
"fs.obs.endpoint" = "xxx"
);
- LOAD LABEL:每个导入任务需要指定一个唯一的Label,后续可以通过该Label来查看作业运行进度。
- DATA INFILE:7上传的待导入至Doris的数据文件所在的OBS路径。
- COLUMNS TERMINATED BY:用于指定列分隔符。该参数仅在CSV格式下需设置,且仅能指定单字节分隔符。
- FORMAT AS:用于指定文件类型,支持CSV、JSON、PARQUET和ORC格式,默认为CSV。
- WITH BROKER:指定需要使用的Broker服务名称。可使用以下命令查看当前集群的Broker信息:
- fs.obs.access.key:9查看的AK信息。
- fs.obs.secret.key:9查看的SK信息。
- fs.obs.endpoint:5查看的Endpoint信息。
- 执行以下命令查看任务执行进度:
show load order by createtime desc limit 1\G;
执行结果中“State”值变为“FINISHED”表示数据导入完成。
JobId: 296805 Label: brokerload_test_csv_label001 State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000 TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 ErrorMsg: NULL ...
- 执行以下命令查看表数据:
select * from test;