Updated on 2024-12-13 GMT+08:00

Configuring Spark to Read and Write Doris Data

The Spark Doris Connector can read data stored in the Doris through Spark and write data to the Doris through Spark.

  • Data can be read from the Doris.
  • Spark DataFrame can be written to Doris in batches or in streaming mode.
  • You can map a Doris table to a DataFrame or RDD. DataFrame is recommended.
  • Data can be filtered at the Doris end to reduce the amount of data to be transferred.
  • The operations in this section are applicable to MRS 3.5.0 and later versions.

Prerequisite

  • A cluster containing the Doris service has been created, and all services in the cluster are running properly.
  • The node to be connected to the Doris database can communicate with the MRS cluster.
  • A user with Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      Log in to FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.

      Log in to FusionInsight Manager as the created dorisuser user, and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Using the MySQL Client to Connect to Doris.
  • The Spark client has been installed.

Procedure

Create a table in Doris and insert data into the table.

  1. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

    If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -uDatabase login username -pDatabase login password -PDatabase connection port -hIP address of the Doris FE instance

    • The database connection port is the query connection port of the Doris FE. You can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
    • To obtain the IP address of the Doris FE instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the IP address of any FE instance.
    • You can also use the MySQL connection software or Doris WebUI to connect to the database.

  2. Run the following commands to create a database and switch the database:

    create database if not exists sparkconnector;

    use sparkconnector;

  3. Run the following statement to create a table:

    CREATE TABLE spark_connector_test_decimal (

    c1 int NOT NULL,

    c2 VARCHAR(25) NOT NULL,

    c3 VARCHAR(152),

    c4 boolean,

    c5 tinyint,

    c6 smallint,

    c7 bigint,

    c8 float,

    c9 double,

    c10 date,

    c11 datetime,

    c12 char,

    c13 largeint,

    c14 varchar,

    c15 decimal(15, 5)

    )

    DUPLICATE KEY(`c1`)

    COMMENT "OLAP"

    DISTRIBUTED BY HASH(`c1`) BUCKETS 1;

  4. Run the following commands to insert data to the table:

    insert into spark_connector_test_decimal values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    insert into spark_connector_test_decimal values(10001,'aaa','abc',false, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    insert into spark_connector_test_decimal values(10002,'aaa','abc',True, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    insert into spark_connector_test_decimal values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

Perform operations on Spark.

  1. Run the following command to log in to the spark-sql client:

    cd Spark client installation directory

    source bigdata_env

    kinit Component service user (skip this step if Kerberos authentication is disabled for the cluster (the cluster is in normal mode))

    spark-sql --master yarn

  2. Run the following command to create a temporary view:

    CREATE TEMPORARY VIEW spark_doris_decimal

    USING doris

    OPTIONS(

    "table.identifier"="sparkconnector.spark_connector_test_decimal",

    "fenodes"=" FE instance IP address :Port number",

    "user"="dorisuser",

    "password"="User password",

    'doris.enable.https' = 'true',

    'doris.ignore.https.ca' = 'true'

    );

    Run the following command to query the data in the Doris table:

    select * from spark_doris_decimal;

    Run the following command to insert data into the Doris table:

    insert into spark_doris_decimal values(10005,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    • After switching to HTTP, delete the following configuration parameters from the WITH clause for creating a table:
      • 'doris.enable.https' = 'true'
      • 'doris.ignore.https.ca' = 'true'
    • The port number is the HTTPS port of the FE service (for clusters with Kerberos authentication enabled) or HTTP port (for clusters with Kerberos authentication disabled). You can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and enter https_port or http_port in the search box.
    • When the amount of data to be imported is too large, you can adjust the following parameters to improve the performance:
      • sink.batch.size: maximum number of rows that can be written to a BE at a time. The default value is 10000.
      • doris.sink.batch.interval.ms: indicates the interval between each batch of sinks, in milliseconds. The default value is 50.