更新时间:2024-12-11 GMT+08:00

使用Broker Load方式导入OBS数据至Doris

Stream Load导入Doris数据需依赖客户端读取,再推送到Doris。Broker Load则是将导入请求发送给Doris,由Doris主动拉取数据,因此如果要导入的数据存储在对象存储中,使用Broker Load是最便捷的。使用Broker Load方式,数据就不需要经过客户端,而由Doris直接读取导入。

用户需要通过MySQL协议创建Broker Load导入,并通过查看导入命令检查导入结果。适用以下场景:

  • 源数据在Broker可以访问的存储系统中,如OBS。
  • 数据量在几十到百GB级别。
  • 支持导入CSV、Parquet、ORC、JSON格式的数据,默认支持导入CSV格式数据。

该章节操作适用于MRS 3.5.0及之后版本。

前提条件

  • 已创建包含Doris服务的集群,集群内各服务运行正常。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris
  • 已准备待导入至Doris的数据文件。

创建OBS并行文件系统并获取AK/SK

创建OBS并行文件系统

  1. 登录OBS管理控制台。
  2. 选择“并行文件系统 > 创建并行文件系统”。
  3. 填写文件系统名称,例如“doris-obs”。

    企业项目需要与MRS集群保持一致,其他参数请根据需要填写。

  4. 单击“立即创建”。
  5. 在并行文件系统列表中单击新建的并行文件系统名称,单击“概览”,查看并记录“Endpoint”信息。

    用户删除服务或者卸载集群后,可能导致2~4创建的并行文件系统下残留脏数据,请用户手动删除。

  6. 在左侧导航栏选择“文件”,单击“新建文件夹”,填写待创建的文件夹名称,例如“test”,单击“确定”。
  7. 单击新建的文件夹名称,单击“上传文件”,上传待导入的数据至该路径下,例如上传的文件为“test_data.csv”。

获取AK/SK信息

  1. 鼠标移到右上角登录用户名处在下拉列表中选择“我的凭证”。
  2. 单击“访问密钥”页签,单击“新增访问密钥”,输入验证码或密码。单击“确定”,生成并下载访问密钥,在.csv文件中获取AK/SK信息。

导入OBS数据至Doris表中

  1. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

    集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -u数据库登录用户 -p -PFE查询连接端口 -hDoris FE实例IP地址

    执行命令后输入数据库登录用户密码。

    • Doris FE的查询连接端口,可以通过登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
    • Doris FE实例IP地址可通过登录MRS集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
    • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。

  2. 执行以下命令创建数据库:

    create database test_broker_load;

    use test_broker_load;

  3. 执行以下命令创建表并导入OBS数据至表中:

    CREATE TABLE IF NOT EXISTS test

    (

    `user_id` LARGEINT NOT NULL COMMENT "用户id",

    `city` VARCHAR(20) COMMENT "用户所在城市",

    `age` SMALLINT COMMENT "用户年龄",

    `gender` TINYINT COMMENT "用户性别",

    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",

    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",

    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"

    )

    AGGREGATE KEY(`user_id`, `city`, `age`, `gender`)

    DISTRIBUTED BY HASH(`user_id`) BUCKETS 100

    PROPERTIES (

    "replication_allocation" = "tag.location.default: 3"

    );

    LOAD LABEL brokerload_test_label001

    (

    DATA INFILE("obs://并行文件系统名称/test/test_data.csv")

    INTO TABLE `test`

    COLUMNS TERMINATED BY ','

    FORMAT AS "csv"

    )

    WITH BROKER "broker1"

    (

    "fs.obs.access.key" = "xxx",

    "fs.obs.secret.key" = "xxx",

    "fs.obs.endpoint" = "xxx"

    );

    • LOAD LABEL:每个导入任务需要指定一个唯一的Label,后续可以通过该Label来查看作业运行进度。
    • DATA INFILE:7上传的待导入至Doris的数据文件所在的OBS路径。
    • COLUMNS TERMINATED BY:用于指定列分隔符。该参数仅在CSV格式下需设置,且仅能指定单字节分隔符。
    • FORMAT AS:用于指定文件类型,支持CSV、JSON、PARQUET和ORC格式,默认为CSV。
    • WITH BROKER:指定需要使用的Broker服务名称。可使用以下命令查看当前集群的Broker信息:

      show broker;

    • fs.obs.access.key:9查看的AK信息。
    • fs.obs.secret.key:9查看的SK信息。
    • fs.obs.endpoint:5查看的Endpoint信息。

  4. 执行以下命令查看任务执行进度:

    show load order by createtime desc limit 1\G;

    执行结果中“State”值变为“FINISHED”表示数据导入完成。

    JobId: 296805
    Label: brokerload_test_csv_label001
    State: FINISHED
    Progress: ETL:100%; LOAD:100%
    Type: BROKER
    EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
    TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
     ErrorMsg: NULL
    ...

  5. 执行以下命令查看表数据:

    select * from test;