Updated on 2024-12-13 GMT+08:00

Importing OBS Data to Doris with Broker Load

To import data to Doris with Stream Load, you need to read the data first by the client. Broker Load simplifies this process by sending import requests to Doris, and Doris pulls data. If the data you want to import is stored in object storage, Broker Load is the most convenient tool. With Broker Load, data is directly read and imported by Doris without passing through the client.

You need to import data with Broker Load through MySQL protocol and check the import result with the import command. Broker Load is suitable for the following scenes:

  • The source data is in a storage system that the broker can access, such as OBS.
  • The amount of data is at the level of tens to hundreds of GB.
  • Data in CSV, Parquet, ORC, and JSON formats can be imported. Only data in CSV format is supported by default.

The operations in this section are applicable to MRS 3.5.0 and later versions.

Prerequisites

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The nodes to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the new user dorisuser and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Using the MySQL Client to Connect to Doris.
  • You have prepared the data files to be imported to Doris.

Creating an OBS Parallel File System and Obtaining the AK/SK

Create an OBS parallel file system.

  1. Log in to the OBS console.
  2. Choose Parallel File Systems > Create Parallel File System.
  3. Enter a file system name, for example, doris-obs.

    The name of an enterprise project must be the same as that of the MRS cluster. Set other parameters.

  4. Click Create Now.
  5. In the parallel file system list, click the name of the one you just created and click Overview to obtain the endpoint information.

    After a service is deleted or a cluster is uninstalled, dirty data may remain in the parallel file system created in 2 to 4. You need to delete the dirty data.

  6. Click Files in the navigation pane, click Create Folder, enter a folder name, for example, test, and click OK.
  7. Click the name of the new folder and click Upload File to upload the data, for example, to test_data.csv.

Obtain AK/SK information.

  1. Move the cursor on the username in the upper right corner, and select My Credentials from the drop-down list.
  2. Click Access Keys, click Create Access Key, and enter the verification code or password. Click OK and download the access keys. Obtain the AK/SK information from the .csv file.

Importing OBS Data to a Doris Table

  1. Log in to the node where MySQL is installed and connect the Doris database.

    If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -uDatabase login user -p -PConnection port for FE queries -hIP address of the Doris FE instance

    Enter the password for logging in to the database.

    • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
    • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
    • You can also use the MySQL connection software or Doris web UI to connect the database.

  2. Create a database.

    create database test_broker_load;

    use test_broker_load;

  3. Create a table and import OBS data to the table.

    CREATE TABLE IF NOT EXISTS test

    (

    `user_id` LARGEINT NOT NULL COMMENT "User ID",

    `city` VARCHAR(20) COMMENT "City",

    `age` SMALLINT COMMENT "Age",

    `gender` TINYINT COMMENT "Gender",

    `cost` BIGINT SUM DEFAULT "0" COMMENT "Total consumption",

    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "Maximum dwell time",

    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "Minimum dwell time"

    )

    AGGREGATE KEY(`user_id`, `city`, `age`, `gender`)

    DISTRIBUTED BY HASH(`user_id`) BUCKETS 100

    PROPERTIES (

    "replication_allocation" = "tag.location.default: 3"

    );

    LOAD LABEL brokerload_test_label001

    (

    DATA INFILE("obs://Parallel file system name/test/test_data.csv")

    INTO TABLE `test`

    COLUMNS TERMINATED BY ','

    FORMAT AS "csv"

    )

    WITH BROKER "broker1"

    (

    "fs.obs.access.key" = "xxx",

    "fs.obs.secret.key" = "xxx",

    "fs.obs.endpoint" = "xxx"

    );

    • LOAD LABEL: unique label that must be specified for each import task. You can use this label to view the job progress.
    • DATA INFILE: OBS path of the data files (uploaded in 7) you want to import to Doris.
    • COLUMNS TERMINATED BY: column separator. This parameter is required only when the CSV format is used. Only a single-byte separator can be specified.
    • FORMAT AS: file type. The value can be CSV, JSON, PARQUET, or ORC. The default value is CSV.
    • WITH BROKER: name of the broker service to be used. You can run the following command to view the broker information of the cluster:

      show broker;

    • fs.obs.access.key: AK information obtained in 9
    • fs.obs.secret.key: SK information obtained in 9
    • fs.obs.endpoint: endpoint information obtained in 5

  4. Check the task progress.

    show load order by createtime desc limit 1\G;

    If the value of State in the command output changes to FINISHED, the data import is complete.

    JobId: 296805
    Label: brokerload_test_csv_label001
    State: FINISHED
    Progress: ETL:100%; LOAD:100%
    Type: BROKER
    EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
    TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
     ErrorMsg: NULL
    ...

  5. Query table data.

    select * from test;