Updated on 2024-05-29 GMT+08:00

Broker Load

Broker load is an asynchronous import method, and the supported data sources depend on the data sources supported by the Broker process.

Data in the Doris table is ordered. When importing data, Broker Load uses Doris cluster resources to sort data. Compared with Spark Load which is used to migrate massive historical data, Broker Load occupies a large number of Doris cluster resources. The Broker Load mode is used when users do not have Spark compute resources. If Spark compute resources are available, Spark Load is recommended.

You need to import data with Broker Load using MySQL protocol and check the import result by viewing the import command. Broker Load is used for the following scenarios:

  • The source data is in a storage system that the broker can access, such as HDFS.
  • The data volume ranges from tens to hundreds of GB.
  • Data in CSV, Parquet, and ORC formats can be imported. Only data in CSV format is supported by default.

Prerequisites

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The node to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the created dorisuser user, and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Installing a MySQL Client.
  • The DBroker instance has been installed and started in Doris.
  • The Hive client has been installed.
  • If Doris imports data across clusters through Broker Load, you need to configure cross-cluster mutual trust. For details, see Configuring Cross-Manager Mutual Trust Between Clusters.

Importing Hive Table Data to Doris

  • Import Hive table data in Text format to Doris.
    1. Run the following commands to log in to the Hive beeline CLI:

      cd /opt/hadoopclient

      source bigdata_env

      kinit Component service user (If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), skip this step.)

    2. Run the following statements to create a Hive table in the default database (the partition field is c4):

      CREATE TABLE test_table(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;

    3. Run the following command to insert data to the Hive table:

      insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');

    4. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

      If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -uDatabase login user -pDatabase login user password -PDatabase connection port -hIP address of Doris FE instance

      • The database connection port is the query connection port of the Doris FE. You can also log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
      • You can also use the MySQL connection software or Doris WebUI to connect to the database.
      • If the Hive and Doris components are deployed across clusters, modify the following configurations:
        • The value of hadoop.rpc.protection of Doris in the cluster where Doris is located must be the same as that of HDFS in the cluster where Hive is located.
        • Change the value of -Djava.security.krb5.conf in configuration item BROKER_GC_OPTS of DBRoker in the cluster where Doris is located. Copy the $BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf file of any HiveServer node in the Hive cluster to any directory of any DBRoker node in the Doris cluster.
    5. Run the following command to create a Doris table:

      CREATE TABLE example_db.test_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    6. Run the following command to import data:
      • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.0/install/FusionInsight-Doris-1.2.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "username"="hdfs",

        "password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • To view the IP address of the active NameNode instance, log in to FusionInsight Manager and choose Cluster > Services > HDFS > Instances.
      • You can log in to FusionInsight Manager, choose Cluster > Services > HDFS > Configurations, and search for dfs.namenode.rpc.port to view the RPC port number.
      • broker_192_168_67_78 indicates the broker name. You can run the show broker; command on the MySQL client to view the broker name.
    7. Run the following command to check the status of the import task:

      show load order by createtime desc limit 1\G;

      JobId: 41326624
      Label: broker_load_2022_03_23
      State: FINISHED
      Progress: ETL:100%; LOAD:100%
      Type: BROKER
      EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
      CreateTime: 2022-04-01 18:59:06
      EtlStartTime: 2022-04-01 18:59:11
      EtlFinishTime: 2022-04-01 18:59:11
      LoadStartTime: 2022-04-01 18:59:11
      LoadFinishTime: 2022-04-01 18:59:11
      URL: NULL
      JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
      1 row in set (0.01 sec)
    8. You can manually cancel an import task whose Broker Load job status is not CANCELLED or FINISHED. To cancel an import task, you need to specify the label of the import task. The statement is as follows:

      CANCEL LOAD FROM Database name WHERE LABEL = "Label name";

      For example, to cancel the import job whose label is broker_load_2022_03_23 in database demo, run the following command:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

  • Importing Hive Table Data in ORC Format to Doris
    1. Run the following commands to log in to the Hive beeline CLI:

      cd /opt/hadoopclient

      source bigdata_env

      kinit Component service user (If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), skip this step.)

    2. Run the following command to create a Hive table in ORC format in the default database:

      CREATE TABLE test_orc_tbl(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as orc;

    3. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

      If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -uDatabase login user -pDatabase login user password -PDatabase connection port -hIP address of Doris FE instance

      • The database connection port is the query connection port of the Doris FE. You can also log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
      • You can also use the MySQL connection software or Doris WebUI to connect to the database.
      • If the Hive and Doris components are deployed across clusters, modify the following configurations:
        • The value of hadoop.rpc.protection of Doris in the cluster where Doris is located must be the same as that of HDFS in the cluster where Hive is located.
        • Change the value of -Djava.security.krb5.conf in configuration item BROKER_GC_OPTS of DBRoker in the cluster where Doris is located. Copy the $BIGDATA_HOME/FusionInsight_HD_*/*_HiveServer/etc/kdc.conf file of any HiveServer node in the Hive cluster to any directory of any DBRoker node in the Doris cluster.
    4. Run the following command to create a Doris table:

      CREATE TABLE example_db.test_orc_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    5. Run the following command to import data using Broker Load:
      • Kerberos authentication is enabled for the cluster (the cluster is in security mode):

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.0/install/FusionInsight-Doris-1.2.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        'username"="hdfs",

        'password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • FORMAT AS "orc" : The format of the data to be imported is ORC.
      • SET: The field mapping between Hive tables and Doris tables and field conversion rules.
      • To view the IP address of the active NameNode instance, log in to FusionInsight Manager and choose Cluster > Services > HDFS > Instances.
      • You can log in to FusionInsight Manager, choose Cluster > Services > HDFS > Configurations, and search for dfs.namenode.rpc.port to view the RPC port number.
      • broker_192_168_67_78 indicates the broker name. You can run the show broker; command on the MySQL client to view the broker name.
    6. Run the following statement to check the status of the imported task:

      show load order by createtime desc limit 1\G;

    7. You can manually cancel an import task whose Broker Load job status is not CANCELLED or FINISHED. To cancel an import task, you need to specify the label of the import task. The statement is as follows:

      CANCEL LOAD FROM Database name WHERE LABEL = "Label name";

      For example, to cancel the import job whose label is broker_load_2022_03_23 in database demo, run the following command:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Related Parameter Configurations

The following configurations are system-level configurations of Broker Load and apply to all Broker Load import tasks.

Log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations > FE (Role) > Customization, and add the following parameters to the customized parameter fe.conf.customized.configs:

  • min_bytes_per_broker_scanner: specifies the minimum amount of data that can be processed by a single BE. The default value is 64 MB.
  • max_bytes_per_broker_scanner: specifies the maximum amount of data that can be processed by a single BE. The default value is 3 GB.
  • max_broker_concurrency: specifies the maximum number of concurrent import tasks in a job. The default value is 10.

The minimum amount of data to be processed, maximum number of concurrent tasks, source file size, and number of BE nodes in the current cluster determine the number of concurrent tasks to be imported.

  • Number of concurrent import tasks = Math.min (Source file size/Minimum processing volume, Maximum number of concurrent import tasks, Number of current BE nodes)
  • Processing volume of a single BE = Size of the source file/Number of concurrent import tasks

Usually the maximum amount of data supported by an import job is the product of the value of max_bytes_per_broker_scanner and the number of BE nodes. To import a larger amount of data, you need to adjust the value of max_bytes_per_broker_scanner.