更新时间:2024-08-30 GMT+08:00
分享

使用Broker Load方式导入数据至Doris

Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。

Doris表中的数据是有序的,Broker Load在导入数据时要利用Doris集群资源对数据进行排序,相对于Spark Load来完成海量历史数据迁移,对Doris的集群资源占用比较大。Broker Load方式是在用户没有Spark计算资源的情况下使用,如果有Spark计算资源建议使用Spark Load。

用户需要通过MySQL协议创建Broker Load 导入,并通过查看导入命令检查导入结果。适用以下场景:

  • 源数据在Broker可以访问的存储系统中,如HDFS。
  • 数据量在几十到百GB级别。
  • 支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。

前提条件

  • 已创建包含Doris服务的集群,集群内各服务运行正常。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris
  • Doris中已安装并启动DBroker实例。
  • 已安装Hive客户端。
  • 如果Doris通过Broker Load跨集群导入数据,需要配置跨集群互信,相关操作可参考配置跨Manager集群互信

导入Hive表数据到Doris中

  • 导入Text格式的Hive表数据到Doris中
    1. 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:

      cd 客户端安装目录

      source bigdata_env

      kinit 组件业务用户(如果集群未启用Kerberos认证(普通模式)请跳过该操作)

    2. 执行以下命令在default库创建Hive表,分区字段为“c4”:

      CREATE TABLE test_table(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;

    3. 执行以下命令插入数据到Hive表中:

      insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');

    4. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

      如果集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址

      • 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
      • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
      • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
      • 如果Hive组件和Doris组件是跨集群部署,需要修改以下配置:
        • Doris所在集群的Doris的“hadoop.rpc.protection”配置项的值需与Hive所在集群中的HDFS组件的该配置项的值保持一致。
        • 需修改Doris所在集群的DBroker的“BROKER_GC_OPTS”配置项的“-Djava.security.krb5.conf”参数,值为拷贝Hive所在集群的任一HiveServer实例节点的“$BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf” 文件到Doris所在集群的任一DBroker的任意目录。
    5. 执行以下命令创建Doris表:

      CREATE TABLE example_db.test_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    6. 执行以下命令导入数据:
      • 集群已启用Kerberos认证(安全模式)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.0/install/FusionInsight-Doris-1.2.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • 集群未启用Kerberos认证(普通模式)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "username"="hdfs",

        "password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
      • RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
      • broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
    7. 执行以下命令查看导入任务的状态信息:

      show load order by createtime desc limit 1\G;

      JobId: 41326624
      Label: broker_load_2022_03_23
      State: FINISHED
      Progress: ETL:100%; LOAD:100%
      Type: BROKER
      EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
      CreateTime: 2022-04-01 18:59:06
      EtlStartTime: 2022-04-01 18:59:11
      EtlFinishTime: 2022-04-01 18:59:11
      LoadStartTime: 2022-04-01 18:59:11
      LoadFinishTime: 2022-04-01 18:59:11
      URL: NULL
      JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
      1 row in set (0.01 sec)
    8. 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的Label ,命令为:

      CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";

      例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

  • 导入ORC格式的Hive表数据到Doris中
    1. 使用客户端安装用户登录安装了Hive客户端的节点,执行以下命令登录Hive beeline命令行:

      cd 客户端安装目录

      source bigdata_env

      kinit 组件业务用户(如果集群未启用Kerberos认证(普通模式)请跳过该操作)

    2. 执行以下命令在default库创建ORC格式的Hive表:

      CREATE TABLE test_orc_tbl(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as orc;

    3. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

      如果集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址

      • 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
      • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
      • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
      • 如果Hive组件和Doris组件是跨集群部署,需要修改以下配置:
        • Doris所在集群的Doris的“hadoop.rpc.protection”配置项的值需与Hive所在集群中的HDFS组件的该配置项的值保持一致。
        • 需修改Doris所在集群的DBroker的“BROKER_GC_OPTS”配置项的“-Djava.security.krb5.conf”参数,值为拷贝Hive所在集群的任一HiveServer实例节点的“$BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf” 文件到Doris所在集群的任一DBroker的任意目录。
    4. 执行以下命令创建Doris表:

      CREATE TABLE example_db.test_orc_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    5. 执行以下命令使用Broker Load 导入数据:
      • 集群已启用Kerberos认证(安全模式)

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.0/install/FusionInsight-Doris-1.2.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • 集群未启用Kerberos认证(普通模式)

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://主NameNode实例IP地址:RPC端口号/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        'username"="hdfs",

        'password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • FORMAT AS "orc" :已指定待导入的数据格式为ORC。
      • SET:定义Hive表和Doris表之间的字段映射关系及字段转换的规则。
      • 主NameNode实例IP地址可在Manager界面,选择“集群 > 服务 > HDFS > 实例”查看。
      • RPC端口号可在Manager界面,选择“集群 > 服务 > HDFS > 配置”,搜索“dfs.namenode.rpc.port”查看。
      • broker_192_168_67_78表示Broker名称,可在MySQL客户端执行show broker;命令查看。
    6. 执行以下命令查看导入任务的状态信息:

      show load order by createtime desc limit 1\G;

    7. 可手动取消Broker Load作业状态不为“CANCELLED”或“FINISHED”的导入任务,取消时需要指定待取消导入任务的 Label ,命令为:

      CANCEL LOAD FROM 数据库名称 WHERE LABEL = "Label名称";

      例如:撤销数据库demo上, label为broker_load_2022_03_23的导入作业:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

相关参数配置

以下配置属于Broker Load的系统级别配置,作用于所有Broker Load导入任务。

登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增以下参数:

  • min_bytes_per_broker_scanner:用于限制单个BE处理的数据量的最小值,默认值为:64MB,单位为:bytes。
  • max_bytes_per_broker_scanner:用于限制单个BE处理的数据量的最大值,默认值为:3G,单位为:bytes。
  • max_broker_concurrency:用于限制一个作业的最大的导入并发数,默认值为:10。

最小处理的数据量、最大并发数、源文件的大小和当前集群BE节点的个数共同决定了本次任务导入的并发数:

  • 本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
  • 本次导入单个BE的处理量 = 源文件大小/本次导入的并发数

通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner * BE节点数。如果需要导入更大数据量,则需要适当调整“max_bytes_per_broker_scanner”参数的大小。

相关文档