Updated on 2024-11-29 GMT+08:00

Broker Load

Broker load is an asynchronous import method, and the supported data sources depend on the data sources supported by the Broker process.

Data in the Doris table is ordered. Broker load uses the Doris cluster resources to sort the data when importing data. Comparing with massive historical data migration with Spark load, this method user a large amount of Doris cluster resources. Broker load is used when users do not have Spark computing resources. If there are Spark computing resources, Spark load is recommended.

You need to import data with Broker Load through MySQL protocol and check the import result by viewing the import command. Broker Load is suitable for the following scenes:

  • The source data is in a storage system that the broker can access, such as HDFS.
  • The data volume ranges from tens to hundreds of GB.
  • Data in CSV, Parquet, and ORC formats can be imported. Only data in CSV format is supported by default.

Prerequisites

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The nodes to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the new user dorisuser and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Installing a MySQL Client.
  • The DBroker instance has been installed and started in Doris.
  • The Hive client has been installed.

Importing Hive Table Data to Doris

  • Import Hive table data in text format to Doris.
    1. Run the following commands to log in to the Hive beeline CLI:

      cd /opt/hadoopclient

      source bigdata_env

      kinit Component service user (If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), skip this step.)

      beeline

    2. Run the following statements to create a Hive table in the default database (the partition field is c4):

      CREATE TABLE test_table(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;

    3. Run the following command to insert data to the Hive table:

      insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');

    4. Log in to the node where MySQL is installed and connect the Doris database.

      If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect the Doris database:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -uDatabase login username -pDatabase login password -PConnection port for FE queries -hIP address of the Doris FE instance

      • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • To obtain the IP address of the Doris FE nstance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
      • You can also use the MySQL connection software or Doris web UI to connect the database.
    5. Run the following command to create a Doris table:

      CREATE TABLE example_db.test_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    6. Run the following command to import data:
      • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.1/install/FusionInsight-Doris-2.0.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "username"="hdfs",

        "password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • To view the IP address of the active NameNode instance, log in to FusionInsight Manager and choose Cluster > Services > HDFS > Instances.
      • You can log in to FusionInsight Manager, choose Cluster > Services > HDFS > Configurations, and search for dfs.namenode.rpc.port to view the RPC port number.
      • broker_192_168_67_78 indicates the broker name. You can run the show broker; command on the MySQL client to view the broker name.
      • Commands carrying authentication passwords pose security risks. Disable historical command recording before running such commands to prevent information leakage.
    7. Run the following statement to check the status of the import job:

      show load order by createtime desc limit 1\G;

      JobId: 41326624
      Label: broker_load_2022_03_23
      State: FINISHED
      Progress: ETL:100%; LOAD:100%
      Type: BROKER
      EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
      CreateTime: 2022-04-01 18:59:06
      EtlStartTime: 2022-04-01 18:59:11
      EtlFinishTime: 2022-04-01 18:59:11
      LoadStartTime: 2022-04-01 18:59:11
      LoadFinishTime: 2022-04-01 18:59:11
      URL: NULL
      JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
      1 row in set (0.01 sec)
    8. You can manually cancel an import task whose Broker Load job status is not CANCELLED or FINISHED. To cancel an import task, you need to specify the label of the import task. The statement is as follows:

      CANCEL LOAD FROM Database name WHERE LABEL = "Label name";

      For example, to cancel the import job whose label is broker_load_2022_03_23 in database demo, run the following command:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

  • Import Hive table data in ORC format to Doris
    1. Run the following commands to log in to the Hive beeline CLI:

      cd /opt/hadoopclient

      source bigdata_env

      kinit Component service user (If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), skip this step.)

      beeline

    2. Run the following statement to create a Hive table in ORC format in the default database:

      CREATE TABLE test_orc_tbl(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as orc;

    3. Log in to the node where MySQL is installed and connect the Doris database.

      If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect the Doris database:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -uDatabase login username -pDatabase login password -PConnection port for FE queries -hIP address of the Doris FE instance

      • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
      • You can also use the MySQL connection software or Doris web UI to connect the database.
    4. Run the following statement to create a Doris table:

      CREATE TABLE example_db.test_orc_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    5. Run the following statement to import data using Broker Load:
      • Kerberos authentication is enabled for the cluster (the cluster is in security mode):

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        COLUMNS TERMINATED BY ","

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.1/install/FusionInsight-Doris-2.0.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        COLUMNS TERMINATED BY ","

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        'username"="hdfs",

        'password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • FORMAT AS "orc" : The data to be imported is in ORC format.
      • SET: The field mapping between Hive tables and Doris tables and field conversion rules.
      • To view the IP address of the active NameNode instance, log in to FusionInsight Manager and choose Cluster > Services > HDFS > Instances.
      • You can log in to FusionInsight Manager, choose Cluster > Services > HDFS > Configurations, and search for dfs.namenode.rpc.port to view the RPC port number.
      • broker_192_168_67_78 indicates the broker name. You can run the show broker; command on the MySQL client to view the broker name.
    6. Run the following statement to check the status of the imported task:

      show load order by createtime desc limit 1\G;

    7. You can manually cancel an import task whose Broker Load job status is not CANCELLED or FINISHED. To cancel an import task, you need to specify the label of the import task. The statement is as follows:

      CANCEL LOAD FROM Database name WHERE LABEL = "Label name";

      For example, to cancel the import job whose label is broker_load_2022_03_23 in database demo, run the following command:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Related Parameter Configurations

The following configurations take effect in the whole system for Broker load and apply to all Broker load import jobs.

Log in to FusionInsight Manager, choose Cluster > Services > Doris and click the Configurations tab. On the displayed page, choose FE (Role) > Customization, and add the following parameters to fe.conf.customized.configs:

  • min_bytes_per_broker_scanner: specifies the minimum amount of data that can be processed by a single BE. The default value is 64 MB. The value must be in bytes.
  • max_bytes_per_broker_scanner: specifies the maximum amount of data that can be processed by a single BE. The default value is 3 GB. The value must be in bytes.
  • max_broker_concurrency: specifies the maximum number of concurrent import tasks in a job. The default value is 10.

The minimum data volume allowed, maximum number of concurrent jobs, source file size, and number of BE nodes in the current cluster determine the number of concurrent import jobs can be processed.

  • Number of concurrent import jobs = Math.min (Source file size/Minimum data volume allowed, Maximum number of concurrent import jobs, Number of current BE nodes)
  • Minimum data volume processed by a single BE = Size of the source file/Number of concurrent import jobs

Usually the maximum amount of data supported by an import job is the product of the value of max_bytes_per_broker_scanner and the number of BE nodes. To import a larger amount of data, you need to adjust the value of max_bytes_per_broker_scanner.