更新时间:2025-09-04 GMT+08:00
分享

使用Broker Load方式导入数据至Doris

Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。

Doris表中的数据是有序的,Broker Load在导入数据时要利用Doris集群资源对数据进行排序,相对于Spark Load来完成海量历史数据迁移,对Doris的集群资源占用比较大。Broker Load方式是在用户没有Spark计算资源的情况下使用,如果有Spark计算资源建议使用Spark Load。

用户需要通过MySQL协议创建Broker Load导入,并通过查看导入命令检查导入结果。适用以下场景:

  • 源数据在Broker可以访问的存储系统中,如HDFS。
  • 数据量在几十到百GB级别。
  • 支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。

前提条件

  • 已创建包含Doris服务的集群,集群内各服务运行正常。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris
  • Doris中已安装并启动DBroker实例。
  • 已安装Hive客户端。
  • 如果Doris通过Broker Load跨集群导入数据,需要配置跨集群互信,相关操作可参考配置跨Manager集群互信

导入Hive表数据到Doris中

执行Broker Load导入任务前,可根据实际需求修改相关参数配置中的参数,以配置合理的导入任务并发数及导入作业支持的最大数量。

  • 导入Text格式的Hive表数据到Doris中
    1. 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:

      切换到客户端安装目录:

      cd 客户端安装目录

      加载环境变量

      source bigdata_env

      认证用户,如果集群未启用Kerberos认证(普通模式),请跳过该操作:

      kinit 组件业务用户

      登录Hive客户端:

      beeline
    2. 执行以下命令在default库创建Hive表,分区字段为“c4”:
      CREATE TABLE test_table(
      `c1` int,
      `c2` int,
      `c3` string)
      PARTITIONED BY (c4 string)
      row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;
    3. 执行以下命令插入数据到Hive表中:
      insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');
    4. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

      如果集群已启用Kerberos认证(安全模式),需先执行以下命令:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      连接Doris数据库:

      mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
      • 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
      • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
      • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
      • 如果Hive组件和Doris组件是跨集群部署,需要修改以下配置:
        • Doris所在集群的Doris的“hadoop.rpc.protection”配置项的值需与Hive所在集群中的HDFS组件的该配置项的值保持一致。
        • 需修改Doris所在集群的DBroker的“BROKER_GC_OPTS”配置项的“-Djava.security.krb5.conf”参数,值为拷贝Hive所在集群的任一HiveServer实例节点的“$BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf” 文件到Doris所在集群的任一DBroker的任意目录。
    5. 执行以下命令创建Doris表:
      CREATE TABLE example_db.test_t1 (
      `c1` int NOT NULL,
      `c4` date NULL,
      `c2` int NOT NULL,
      `c3` String NOT NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`c1`, `c4`)
      PARTITION BY RANGE(`c4`)
      (
      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
      DISTRIBUTED BY HASH(`c1`) BUCKETS 1
      PROPERTIES (
      "replication_allocation" = "tag.location.default: 3",
      "dynamic_partition.enable" = "true",
      "dynamic_partition.time_unit" = "MONTH",
      "dynamic_partition.start" = "-2147483648",
      "dynamic_partition.end" = "2",
      "dynamic_partition.prefix" = "P_",
      "dynamic_partition.buckets" = "1",
      "in_memory" = "false",
      "storage_format" = "V2"
      );
    6. 执行以下命令导入数据:
      • 集群已启用Kerberos认证(安全模式)
        LOAD LABEL broker_load_2022_03_23
        (
        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*")
        INTO TABLE test_t1
        COLUMNS TERMINATED BY ","
        (c1,c2,c3)
        COLUMNS FROM PATH AS (`c4`)
        SET
        (
        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
        )
        )
        WITH BROKER "broker_192_168_67_78"
        (
        "hadoop.security.authentication"="kerberos",
        "kerberos_principal"="${doris_keytab_principal}",
        "kerberos_keytab"="/home/omm/doris_keytab/doris.keytab",
        "dfs.nameservices" = "hacluster",
        "dfs.ha.namenodes.hacluster" = "37,36",
        "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号",
        "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号",
        "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        )
        PROPERTIES
        (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
        );
      • 集群未启用Kerberos认证(普通模式)
        LOAD LABEL broker_load_2022_03_23
        (
        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*")
        INTO TABLE test_t1
        COLUMNS TERMINATED BY ","
        (c1,c2,c3)
        COLUMNS FROM PATH AS (`c4`)
        SET
        (
        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
        )
        )
        WITH BROKER "broker_192_168_67_78"
        (
        "username"="hdfs",
        "password"="",
        "dfs.nameservices" = "hacluster",
        "dfs.ha.namenodes.hacluster" = "37,36",
        "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号",
        "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号",
        "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        )
        PROPERTIES
        (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
        );

      其中:

      • 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
      • RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
      • broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
      • 其他参数介绍请参见表1
        表1 Broker Load命令参数介绍

        参数

        参数说明

        kerberos_principal

        访问Hadoop集群的keytab文件对应的principal。

        • MRS 3.5.0之前版本,参数值为“doris/hadoop.hadoop.com@HADOOP.COM”。
        • MRS 3.5.0及之后版本,格式为“doris/hadoop.${系统域名转换为小写}@${系统域名}”,系统域名可登录Manager页面,选择“系统 > 权限 > 域和互信”,查看“本端域”参数获取。例如,查看到的系统域名为“A39A7DF8_953D_4772_B909_035A594FFA55.COM”,则该参数值为“doris/hadoop.a39a7df8_953d_4772_b909_035a594ffa55.com@A39A7DF8_953D_4772_B909_035A594FFA55.COM”。

        kerberos_keytab

        访问Hadoop集群的keytab文件,该keytab位于FE节点的“${BIGDATA_HOME}/FusionInsight_Doris_*/install/FusionInsight-Doris-*/doris-be/bin/doris.keytab”路径中,需要拷贝该keytab文件到所有Broker节点上,例如/home/omm/doris_keytab”目录下,并执行以下命令设置“doris.keytab”文件属组:

        chown omm:wheel /home/omm/doris_keytab -R

        dfs.nameservices

        集群NameService名称。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。

        dfs.ha.namenodes.hacluster

        集群NameService前缀,包含两个值。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。

        dfs.client.failover.proxy.provider.hacluster

        指定HDFS客户端连接集群中Active状态节点的Java类,值为“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”。

    7. 执行以下命令查看导入任务的状态信息:
      show load order by createtime desc limit 1\G;

      命令回显信息如下,表示导入任务执行成功:

      JobId: 41326624
      Label: broker_load_2022_03_23
      State: FINISHED
      Progress: ETL:100%; LOAD:100%
      Type: BROKER
      EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
      CreateTime: 2022-04-01 18:59:06
      EtlStartTime: 2022-04-01 18:59:11
      EtlFinishTime: 2022-04-01 18:59:11
      LoadStartTime: 2022-04-01 18:59:11
      LoadFinishTime: 2022-04-01 18:59:11
      URL: NULL
      JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
      1 row in set (0.01 sec)
    8. 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的Label ,命令为:
      CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";

      例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
  • 导入ORC格式的Hive表数据到Doris中
    1. 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:

      切换到客户端安装目录:

      cd 客户端安装目录

      加载环境变量

      source bigdata_env

      认证用户,如果集群未启用Kerberos认证(普通模式),请跳过该操作:

      kinit 组件业务用户

      登录Hive客户端:

      beeline
    2. 执行以下命令在default库创建ORC格式的Hive表:
      CREATE TABLE test_orc_tbl(
      `c1` int,
      `c2` int,
      `c3` string)
      PARTITIONED BY (c4 string)
      row format delimited fields terminated by ','lines terminated by '\n' stored as orc;
    3. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

      如果集群已启用Kerberos认证(安全模式),需先执行以下命令:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      连接Doris数据库:

      mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
      • 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
      • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
      • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
      • 如果Hive组件和Doris组件是跨集群部署,需要修改以下配置:
        • Doris所在集群的Doris的“hadoop.rpc.protection”配置项的值需与Hive所在集群中的HDFS组件的该配置项的值保持一致。
        • 需修改Doris所在集群的DBroker的“BROKER_GC_OPTS”配置项的“-Djava.security.krb5.conf”参数,值为拷贝Hive所在集群的任一HiveServer实例节点的“$BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf” 文件到Doris所在集群的任一DBroker的任意目录。
    4. 执行以下命令创建Doris表:
      CREATE TABLE example_db.test_orc_t1 (
      `c1` int NOT NULL,
      `c4` date NULL,
      `c2` int NOT NULL,
      `c3` String NOT NULL
      ) ENGINE=OLAP
      UNIQUE KEY(`c1`, `c4`)
      PARTITION BY RANGE(`c4`)
      (
      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
      DISTRIBUTED BY HASH(`c1`) BUCKETS 1
      PROPERTIES (
      "replication_allocation" = "tag.location.default: 3",
      "dynamic_partition.enable" = "true",
      "dynamic_partition.time_unit" = "MONTH",
      "dynamic_partition.start" = "-2147483648",
      "dynamic_partition.end" = "2",
      "dynamic_partition.prefix" = "P_",
      "dynamic_partition.buckets" = "1",
      "in_memory" = "false",
      "storage_format" = "V2"
      );
    5. 执行以下命令使用Broker Load导入数据:
      • 集群已启用Kerberos认证(安全模式)
        LOAD LABEL broker_load_2022_03_24
        (
        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*")
        INTO TABLE test_orc_t1
        FORMAT AS "orc"
        (c1,c2,c3)
        COLUMNS FROM PATH AS (`c4`)
        SET
        (
        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
        )
        )
        WITH BROKER "broker_192_168_67_78"
        (
        "hadoop.security.authentication"="kerberos",
        "kerberos_principal"="${doris_keytab_principal}",
        "kerberos_keytab"="/home/omm/doris_keytab/doris.keytab",
        "dfs.nameservices" = "hacluster",
        "dfs.ha.namenodes.hacluster" = "37,36",
        "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号",
        "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号",
        "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        )
        PROPERTIES
        (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
        );
      • 集群未启用Kerberos认证(普通模式)
        LOAD LABEL broker_load_2022_03_24
        (
        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*")
        INTO TABLE test_orc_t1PROVIDOR
        FORMAT AS "orc"
        (c1,c2,c3)
        COLUMNS FROM PATH AS (`c4`)
        SET
        (
        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3
        )
        )
        WITH BROKER "broker_192_168_67_78"
        (
        'username"="hdfs",
        'password"="",
        "dfs.nameservices" = "hacluster",
        "dfs.ha.namenodes.hacluster" = "37,36",
        "dfs.namenode.rpc-address.hacluster.37" = "主NameNode实例IP地址:RPC端口号",
        "dfs.namenode.rpc-address.hacluster.36" = "备NameNode实例IP地址:RPC端口号",
        "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        )
        PROPERTIES
        (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
        );

      其中:

      • FORMAT AS "orc" :已指定待导入的数据格式为ORC。
      • SET:定义Hive表和Doris表之间的字段映射关系及字段转换的规则。
      • 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
      • RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
      • broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
      • 其他参数介绍请参见表2
        表2 Broker Load命令参数介绍

        参数

        参数说明

        kerberos_principal

        访问Hadoop集群的keytab文件对应的principal。

        • MRS 3.5.0之前版本,参数值为“doris/hadoop.hadoop.com@HADOOP.COM”。
        • MRS 3.5.0及之后版本,格式为“doris/hadoop.${系统域名转换为小写}@${系统域名}”,系统域名可登录Manager页面,选择“系统 > 权限 > 域和互信”,查看“本端域”参数获取。例如,查看到的系统域名为“A39A7DF8_953D_4772_B909_035A594FFA55.COM”,则该参数值为“doris/hadoop.a39a7df8_953d_4772_b909_035a594ffa55.com@A39A7DF8_953D_4772_B909_035A594FFA55.COM”。

        kerberos_keytab

        访问Hadoop集群的keytab文件,该keytab位于FE节点的“${BIGDATA_HOME}/FusionInsight_Doris_*/install/FusionInsight-Doris-*/doris-be/bin/doris.keytab”路径中,需要拷贝该keytab文件到所有Broker节点上,例如/home/omm/doris_keytab”目录下,并执行以下命令设置“doris.keytab”文件属组:

        chown omm:wheel /home/omm/doris_keytab -R

        dfs.nameservices

        集群NameService名称。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。

        dfs.ha.namenodes.hacluster

        集群NameService前缀,包含两个值。可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/1_*_NameNode/etc”目录下的“hdfs-site.xml”中查找该配置项的值。

        dfs.client.failover.proxy.provider.hacluster

        指定HDFS客户端连接集群中Active状态节点的Java类,值为“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”。

    6. 执行以下命令查看导入任务的状态信息:
      show load order by createtime desc limit 1\G;
    7. 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的 Label ,命令为:
      CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";

      例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

相关参数配置

以下配置属于Broker Load的系统级别配置,作用于所有Broker Load导入任务。

登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增以下参数:

  • min_bytes_per_broker_scanner:用于限制单个BE处理的数据量的最小值,默认值为:64MB,单位为:bytes。
  • max_bytes_per_broker_scanner:用于限制单个BE处理的数据量的最大值,默认值为:3G,单位为:bytes。
  • max_broker_concurrency:用于限制一个作业的最大的导入并发数,默认值为:10。

最小处理的数据量、最大并发数、源文件的大小和当前集群BE节点的个数共同决定了本次任务导入的并发数:

  • 本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
  • 本次导入单个BE的处理量 = 源文件大小/本次导入的并发数

通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner * BE节点数。如果需要导入更大数据量,则需要适当调整“max_bytes_per_broker_scanner”参数的大小。

相关文档