El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Broker Load

Updated on 2024-11-29 GMT+08:00

Broker load is an asynchronous import method, and the supported data sources depend on the data sources supported by the Broker process.

Data in the Doris table is ordered. Broker load uses the Doris cluster resources to sort the data when importing data. Comparing with massive historical data migration with Spark load, this method user a large amount of Doris cluster resources. Broker load is used when users do not have Spark computing resources. If there are Spark computing resources, Spark load is recommended.

You need to import data with Broker Load through MySQL protocol and check the import result by viewing the import command. Broker Load is suitable for the following scenes:

  • The source data is in a storage system that the broker can access, such as HDFS.
  • The data volume ranges from tens to hundreds of GB.
  • Data in CSV, Parquet, and ORC formats can be imported. Only data in CSV format is supported by default.

Prerequisites

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The nodes to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the new user dorisuser and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Installing a MySQL Client.
  • The DBroker instance has been installed and started in Doris.
  • The Hive client has been installed.

Importing Hive Table Data to Doris

  • Import Hive table data in text format to Doris.
    1. Run the following commands to log in to the Hive beeline CLI:

      cd /opt/hadoopclient

      source bigdata_env

      kinit Component service user (If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), skip this step.)

      beeline

    2. Run the following statements to create a Hive table in the default database (the partition field is c4):

      CREATE TABLE test_table(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as textfile ;

    3. Run the following command to insert data to the Hive table:

      insert into table test_table values(1,1,'1','2022-04-10'),(2,2,'2','2022-04-22');

    4. Log in to the node where MySQL is installed and connect the Doris database.

      If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect the Doris database:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -uDatabase login username -pDatabase login password -PConnection port for FE queries -hIP address of the Doris FE instance

      NOTE:
      • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • To obtain the IP address of the Doris FE nstance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
      • You can also use the MySQL connection software or Doris web UI to connect the database.
    5. Run the following command to create a Doris table:

      CREATE TABLE example_db.test_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    6. Run the following command to import data:
      • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.1/install/FusionInsight-Doris-2.0.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

        LOAD LABEL broker_load_2022_03_23

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_table/*/*")

        INTO TABLE test_t1

        COLUMNS TERMINATED BY ","

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "username"="hdfs",

        "password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      NOTE:
      • To view the IP address of the active NameNode instance, log in to FusionInsight Manager and choose Cluster > Services > HDFS > Instances.
      • You can log in to FusionInsight Manager, choose Cluster > Services > HDFS > Configurations, and search for dfs.namenode.rpc.port to view the RPC port number.
      • broker_192_168_67_78 indicates the broker name. You can run the show broker; command on the MySQL client to view the broker name.
      • Commands carrying authentication passwords pose security risks. Disable historical command recording before running such commands to prevent information leakage.
    7. Run the following statement to check the status of the import job:

      show load order by createtime desc limit 1\G;

      JobId: 41326624
      Label: broker_load_2022_03_23
      State: FINISHED
      Progress: ETL:100%; LOAD:100%
      Type: BROKER
      EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
      CreateTime: 2022-04-01 18:59:06
      EtlStartTime: 2022-04-01 18:59:11
      EtlFinishTime: 2022-04-01 18:59:11
      LoadStartTime: 2022-04-01 18:59:11
      LoadFinishTime: 2022-04-01 18:59:11
      URL: NULL
      JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
      1 row in set (0.01 sec)
    8. You can manually cancel an import task whose Broker Load job status is not CANCELLED or FINISHED. To cancel an import task, you need to specify the label of the import task. The statement is as follows:

      CANCEL LOAD FROM Database name WHERE LABEL = "Label name";

      For example, to cancel the import job whose label is broker_load_2022_03_23 in database demo, run the following command:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

  • Import Hive table data in ORC format to Doris
    1. Run the following commands to log in to the Hive beeline CLI:

      cd /opt/hadoopclient

      source bigdata_env

      kinit Component service user (If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), skip this step.)

      beeline

    2. Run the following statement to create a Hive table in ORC format in the default database:

      CREATE TABLE test_orc_tbl(

      `c1` int,

      `c2` int,

      `c3` string)

      PARTITIONED BY (c4 string)

      row format delimited fields terminated by ','lines terminated by '\n' stored as orc;

    3. Log in to the node where MySQL is installed and connect the Doris database.

      If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect the Doris database:

      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

      mysql -uDatabase login username -pDatabase login password -PConnection port for FE queries -hIP address of the Doris FE instance

      NOTE:
      • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
      • You can also use the MySQL connection software or Doris web UI to connect the database.
    4. Run the following statement to create a Doris table:

      CREATE TABLE example_db.test_orc_t1 (

      `c1` int NOT NULL,

      `c4` date NULL,

      `c2` int NOT NULL,

      `c3` String NOT NULL

      ) ENGINE=OLAP

      UNIQUE KEY(`c1`, `c4`)

      PARTITION BY RANGE(`c4`)

      (

      PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))

      DISTRIBUTED BY HASH(`c1`) BUCKETS 1

      PROPERTIES (

      "replication_allocation" = "tag.location.default: 3",

      "dynamic_partition.enable" = "true",

      "dynamic_partition.time_unit" = "MONTH",

      "dynamic_partition.start" = "-2147483648",

      "dynamic_partition.end" = "2",

      "dynamic_partition.prefix" = "P_",

      "dynamic_partition.buckets" = "1",

      "in_memory" = "false",

      "storage_format" = "V2"

      );

    5. Run the following statement to import data using Broker Load:
      • Kerberos authentication is enabled for the cluster (the cluster is in security mode):

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        COLUMNS TERMINATED BY ","

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        "hadoop.security.authentication"="kerberos",

        "kerberos_principal"="doris/hadoop.hadoop.com@HADOOP.COM",

        "kerberos_keytab"="${BIGDATA_HOME}/FusionInsight_Doris_8.3.1/install/FusionInsight-Doris-2.0.3/doris-fe/bin/doris.keytab"

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

        LOAD LABEL broker_load_2022_03_24

        (

        DATA INFILE("hdfs://IP address of the active NameNode instance:RPC port number/user/hive/warehouse/test_orc_tbl/*/*")

        INTO TABLE test_orc_t1

        COLUMNS TERMINATED BY ","

        FORMAT AS "orc"

        (c1,c2,c3)

        COLUMNS FROM PATH AS (`c4`)

        SET

        (

        c4 = str_to_date(`c4`,'%Y-%m-%d'),c1=c1,c2=c2,c3=c3

        )

        )

        WITH BROKER "broker_192_168_67_78"

        (

        'username"="hdfs",

        'password"=""

        )

        PROPERTIES

        (

        "timeout"="1200",

        "max_filter_ratio"="0.1"

        );

      NOTE:
      • FORMAT AS "orc" : The data to be imported is in ORC format.
      • SET: The field mapping between Hive tables and Doris tables and field conversion rules.
      • To view the IP address of the active NameNode instance, log in to FusionInsight Manager and choose Cluster > Services > HDFS > Instances.
      • You can log in to FusionInsight Manager, choose Cluster > Services > HDFS > Configurations, and search for dfs.namenode.rpc.port to view the RPC port number.
      • broker_192_168_67_78 indicates the broker name. You can run the show broker; command on the MySQL client to view the broker name.
    6. Run the following statement to check the status of the imported task:

      show load order by createtime desc limit 1\G;

    7. You can manually cancel an import task whose Broker Load job status is not CANCELLED or FINISHED. To cancel an import task, you need to specify the label of the import task. The statement is as follows:

      CANCEL LOAD FROM Database name WHERE LABEL = "Label name";

      For example, to cancel the import job whose label is broker_load_2022_03_23 in database demo, run the following command:

      CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Related Parameter Configurations

The following configurations take effect in the whole system for Broker load and apply to all Broker load import jobs.

Log in to FusionInsight Manager, choose Cluster > Services > Doris and click the Configurations tab. On the displayed page, choose FE (Role) > Customization, and add the following parameters to fe.conf.customized.configs:

  • min_bytes_per_broker_scanner: specifies the minimum amount of data that can be processed by a single BE. The default value is 64 MB. The value must be in bytes.
  • max_bytes_per_broker_scanner: specifies the maximum amount of data that can be processed by a single BE. The default value is 3 GB. The value must be in bytes.
  • max_broker_concurrency: specifies the maximum number of concurrent import tasks in a job. The default value is 10.

The minimum data volume allowed, maximum number of concurrent jobs, source file size, and number of BE nodes in the current cluster determine the number of concurrent import jobs can be processed.

  • Number of concurrent import jobs = Math.min (Source file size/Minimum data volume allowed, Maximum number of concurrent import jobs, Number of current BE nodes)
  • Minimum data volume processed by a single BE = Size of the source file/Number of concurrent import jobs

Usually the maximum amount of data supported by an import job is the product of the value of max_bytes_per_broker_scanner and the number of BE nodes. To import a larger amount of data, you need to adjust the value of max_bytes_per_broker_scanner.

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback