文档首页/ MapReduce服务 MRS/ 最佳实践/ 数据分析/ 实时OLAP数据分析/ 通过StreamLoad将本地CSV文件导入Doris
更新时间:2025-08-09 GMT+08:00
分享

通过StreamLoad将本地CSV文件导入Doris

应用场景

本地CSV文件数据通过StreamLoad导入到Doris,需要写入Doris的数据不重复,通过引入 Unique 数据模型保证 Key 的唯一性。

方案架构

离线数据可以从数据湖加载,也可以直接加载本地文件。从数据湖加载可以使用工具CDM,在没有CDM工具时,可以直接使用外表加载、BrokerLoad的方式。

本地文件加载则使用Doris自带的StreamLoad工具。

图1 离线数据加载
  • 场景特点:

    业务特点为离线数据,端到端时延无要求,仅对数据集市中的数据查询时延有要求,如离线报表场景、实时报表的数据初始化场景。

  • 数据加载方式:
    • 在存在DataArts Studio工具的场景下,数据加载统一使用CDM进行(DataArts Studio需要2.10及以上版本)。
    • 在不存在DataArts Studio工具的场景下,可以使用Spark或者外表方式加载。由数据加工方进行数据推送加载时,推荐使用BorkerLoad加载方式,方便数据加工方统一调度;如果有数据应用方负责数据加载,则使用外表方式进行数据加载较为方便应用层另外配置加载数据。
    • 在没有数据湖的场景下,轻量化数仓场景中,数据直接由源端文件加载到Doris中,可以使用Streamload方式加载本地文件。
  • 注意事项:

    Hive外表通过整体添加Hive的MetaStore的方式,将全量元数据在Doris中呈现。

步骤1:创建MRS Doris集群并配置

  1. 创建MRS Doris集群。

    详细操作请参考自定义购买MRS集群,例如相关参数如下:

    • 集群名称:自定义,例如“mrs_doris”
    • 集群版本:MRS 3.3.0-LTS
    • 组件选择:Doris集群。
    • Kerberos认证:开启

    其他参数根据实际需要进行配置。

  2. 集群购买完成后安装集群客户端。

    详细操作请参考安装客户端(3.x版本)

    例如客户端安装路径为“/opt/dorisclient”。

  3. 在Doris集群安装MySQL客户端。

    详细操作请参考使用MySQL客户端连接Doris

  4. 创建拥有Doris管理权限的人机用户(例如用户名为dorisuser),并修改初始密码。

    1. 登录Doris集群Manager页面。
    2. 选择“系统 > 权限 > 角色 > 添加角色”,填写角色名称,如“dorisrole”,在“配置资源权限”选择“待操作的集群 > Doris”,勾选“Doris管理员权限”,单击“确定”。
    3. 选择“用户 > 添加用户”,填写用户名称,如“dorisuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“dorisrole”角色,单击“确定”。
    4. 使用新建的dorisuser用户重新登录FusionInsight Manager,修改该用户初始密码。

  5. 如果要导入的文件大小超过10G,则需要修改超时时间及最大导入文件大小参数。

    1. 登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > 全部配置”。
    2. 选择“FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增参数“stream_load_default_timeout_second”,参数描述及取值请参见表1
    3. 选择“BE(角色) > 自定义”,在自定义参数“be.conf.customized.configs”中新增参数“streaming_load_max_mb”,参数描述及取值请参见表1
    表1 自定义参数描述

    参数名

    参数描述

    stream_load_default_timeout_second

    表示导入任务的超时时间,默认超时时间为600秒,单位为秒。

    若导入任务在设定的时间内未完成则会被系统取消,状态变为“CANCELLED”。默认超时时间为600秒,如果导入的源文件无法在规定时间内完成导入,可以在Stream Load请求中设置单独的超时时间,或调整“stream_load_default_timeout_second”参数值设置全局的默认超时时间。

    streaming_load_max_mb

    表示Stream Load的最大导入文件大小,默认值为10G,单位为MB。

    如果原始文件超过该值,则需要适当调整该参数值。

    由于Stream Load是由BE进行导入并分发数据,建议导入数据量在1G到10G之间。由于默认最大Stream Load的导入数据量为10G,所以如果要导入超过10G的文件需要修改BE的配置参数“streaming_load_max_mb”。

步骤2:创建Doris数据表及CSV文件

  1. 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
    mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
    • 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
    • Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。

  2. 执行以下命令创建数据库(例如名称为“example_db”)并切换。

    create database if not exists example_db;
    use example_db;

  3. 执行以下命令创建表。

    CREATE TABLE `lineorder_stream_u` (
      `LO_ORDERKEY` bigint NOT NULL COMMENT "订单id",
      `LO_LINENUMBER` int NOT NULL COMMENT "订单码",
      `LO_CUSTKEY` bigint NOT NULL COMMENT "客户id",
      `LO_PARTKEY` bigint NOT NULL COMMENT "商品id",
      `LO_SUPPKEY` bigint NOT NULL COMMENT "供应商id",
      `LO_ORDERDATE` date DEFAULT NULL COMMENT "订单日期",
      `LO_ORDERPRIORITY` varchar(100) DEFAULT NULL COMMENT "订单优先级",
      `LO_SHIPPRIORITY` int DEFAULT NULL COMMENT "物流优先级",
      `LO_QUANTITY` int DEFAULT NULL COMMENT "数量",
      `LO_EXTENDEDPRICE` bigint DEFAULT NULL COMMENT "扩展价格",
      `LO_ORDTOTALPRICE` bigint DEFAULT NULL COMMENT "订单总价",
      `LO_DISCOUNT` int DEFAULT NULL COMMENT "折扣",
      `LO_REVENUE` bigint DEFAULT NULL COMMENT "收入",
      `LO_SUPPLYCOST` int DEFAULT NULL COMMENT "成本",
      `LO_TAX` int DEFAULT NULL COMMENT "税款",
      `LO_COMMITDATE` date DEFAULT NULL COMMENT "承诺日期",
      `LO_SHIPMODE` varchar(100) NOT NULL COMMENT "运输方式"
    ) UNIQUE KEY (
      `LO_ORDERKEY`,
      `LO_LINENUMBER`,
      `LO_CUSTKEY`,
      `LO_PARTKEY`,
      `LO_SUPPKEY`,
      `LO_ORDERDATE`
    ) PARTITION BY RANGE(`LO_ORDERDATE`) (
      PARTITION `p1992`
      VALUES
        LESS THAN ("1993-01-01"),
        PARTITION `p1993`
      VALUES
        LESS THAN ("1994-01-01"),
        PARTITION `p1994`
      VALUES
        LESS THAN ("1995-01-01"),
        PARTITION `p1995`
      VALUES
        LESS THAN ("1996-01-01"),
        PARTITION `p1996`
      VALUES
        LESS THAN ("1997-01-01"),
        PARTITION `p1997`
      VALUES
        LESS THAN ("1998-01-01"),
        PARTITION `p1998`
      VALUES
        LESS THAN ("1999-01-01"),
        PARTITION `p1999`
      VALUES
        LESS THAN ("2000-01-01"),
        PARTITION `p2000`
      VALUES
        LESS THAN ("2001-01-01"),
        PARTITION `p2001`
      VALUES
        LESS THAN ("2002-01-01")
    ) DISTRIBUTED BY HASH(`LO_ORDERDATE`) BUCKETS 1 PROPERTIES (
      "replication_allocation" = "tag.location.default: 3"
    );

    例如执行show tables;后结果如下,表示建表成功:

  4. 退出Doris数据库至客户端,创建数据文件“data.csv”,内容如下。

    1,1,1,1,1,1997-01-01,01-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    1,1,1,1,1,1997-01-01,10-SEE,0,10,1210,18547410,10,2311310,32310,10,1997-01-10,DER10
    2,2,2,2,2,1997-01-01,02-SEE,0,2,1202,18547402,2,2311302,32302,2,1997-01-02,DER02
    2,2,2,2,2,1997-01-01,01-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    3,3,3,3,3,1997-01-01,03-SEE,0,3,1203,18547403,3,2311303,32303,3,1997-01-03,DER03
    3,3,3,3,3,1997-01-01,01-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    4,4,4,4,4,1997-01-01,04-SEE,0,4,1204,18547404,4,2311304,32304,4,1997-01-04,DER04
    4,4,4,4,4,1997-01-01,01-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    5,5,5,5,5,1997-01-01,05-SEE,0,5,1205,18547405,5,2311305,32305,5,1997-01-05,DER05
    5,5,5,5,5,1997-01-01,01-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    6,6,6,6,6,1997-01-01,06-SEE,0,6,1206,18547406,6,2311306,32306,6,1997-01-06,DER06
    6,6,6,6,6,1997-01-01,06-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    7,7,7,7,7,1997-01-01,07-SEE,0,7,1207,18547407,7,2311307,32307,7,1997-01-07,DER07
    7,7,7,7,7,1997-01-01,07-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    8,8,8,8,8,1997-01-01,08-SEE,0,8,1208,18547408,8,2311308,32308,8,1997-01-08,DER08
    8,8,8,8,8,1997-01-01,08-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    9,9,9,9,9,1997-01-01,09-SEE,0,9,1209,18547409,9,2311309,32309,9,1997-01-09,DER09
    9,9,9,9,9,1997-01-01,09-SEE,0,1,1201,18547401,1,231,3201,1,1997-01-01,DER01
    10,10,10,10,10,1997-01-01,10-SEE,0,1,1201,18547401,1,2311301,32301,1,1997-01-01,DER01
    10,10,10,10,10,1997-01-01,10-SEE,0,1,1201,18547401,1,2311301,32301,1,1997-01-01,DER01

步骤3:使用Stream Load导入CSV文件至Doris

  1. root用户登录“data.csv”文件所在节点。
  2. 执行以下命令使用Stream Load导入“data.csv”中的数据到Doris数据表中。

    • 集群已启用Kerberos认证(安全模式)
      curl --location-trusted -u doris用户:用户密码 -H "label:table1_20230217" -H "column_separator:," -T data.csv https://Doris FE实例IP地址:HTTPS端口/api/{数据库名}/{表名}/_stream_load
    • 集群未启用Kerberos认证(普通模式)
      curl -k --location-trusted -u doris用户:用户密码 -H "label:table1_20230217" -H "column_separator:," -T data.csv http://Doris FE实例IP地址:HTTP端口/api/{数据库名}/{表名}/_stream_load
    • -H "label:table1_20230217"表示,指定导入数据标签为table1_20230217,不指定时系统会自动生成。
    • -H "column_separator:,"表示,CSV文件中多字段值的分隔符为逗号,可根据实际数据格式调整分隔符。
    • Doris FE实例IP地址:可在Manager界面,选择“集群 > 服务 > Doris > 实例”查看FE实例业务IP地址。
    • HTTPS端口号:可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看。
    • HTTP端口号:可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“http_port”查看。

  3. 执行以下命令查看表数据。

    select * from example_db.lineorder_stream_u;

    例如查询结果如下,查询数据成功,且符合预期,表示导入成功。

相关文档