Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with RDS (By Scala)> Detailed Development Description

Detailed Development Description

Updated at: Mar 17, 2020 GMT+08:00

Prerequisites

A datasource connection has been created on the DLI management console.

Construct dependency information and create a Spark session.

  1. Import dependencies

    Involved Maven dependency

    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
    Dependencies related to import
    1
    2
    3
    import java.util.Properties
    import org.apache.spark.sql.{Row,SparkSession}
    import org.apache.spark.sql.SaveMode
    
  2. Create a session
    1
    val sparkSession = SparkSession.builder().getOrCreate()
    

Connecting to Datasources Through SQL APIs

  1. Create a table to connect to RDS datasource
    1
    2
    3
    4
    5
    6
    7
    sparkSession.sql(
      "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (
         'url'='jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306',
         'dbtable'='test.customer',
         'user'='root',
         'password'='######',
         'driver'='com.mysql.jdbc.Driver')")
    
    Table 1 Parameters for creating a table

    Parameter

    Description

    url

    Specifies the RDS connection address. You need to create a datasource connection first. For details about how to create a datasource connection on the management console, see Basic Datasource Connections and Enhanced Datasource Connections in the Data Lake Insight User Guide.

    After a basic datasource connection is created, the returned IP address is used.

    After an enhanced datasource connection is created, use the internal network domain name or internal network address and database port number provided by RDS to connect to DLI. If MySQL is used, the format is protocol header://internal IP address:internal network port number. If PostgreSQL is used, the format is protocol header://internal IP address:internal network port number/database name.

    For example: jdbc:mysql://192.168.0.193:3306 or jdbc:postgresql://192.168.0.193:3306/postgres. For details about how to obtain the value, see RDS cluster information.

    NOTE:

    The default format of a datasource connection address is protocol header://IP address:port number.

    For example: jdbc:mysql://to-rds-1174405119-oLRHAGE7.datasource.com:3306

    To connect to an RDS PostgreSQL cluster, change the protocol header in the IP address to jdbc:postgresql and add /database name to the end of the IP address.

    For example: jdbc:postgresql://to-rds-1174405119-oLRHAGE7.datasource.com:3306/postgreDB

    user

    RDS database username.

    password

    RDS database password.

    dbtable

    To connect to a MySQL cluster, enter database name.table name. To connect to a PostgreSQL cluster, enter mode name.table name.

    driver

    JDBC driver class name. To connect to a MySQL cluster, enter com.mysql.jdbc.Driver. To connect to a PostgreSQL cluster, enter org.postgresql.Driver.

    partitionColumn

    This parameter is used to set the numeric field used concurrently when data is read.

    NOTE:

    1. The partitionColumn, lowerBound, upperBound, and numPartitions parameters must be set at the same time.

    2. To improve the concurrent read performance, you are advised to use auto-increment columns.

    lowerBound

    Minimum value of a column specified by partitionColumn. The value is contained in the returned result.

    upperBound

    Maximum value of a column specified by partitionColumn. The value is not contained in the returned result.

    numPartitions

    Number of concurrent read operations.

    NOTE:

    When data is read, lowerBound and upperBound are evenly allocated to each task to obtain data. Example:

    'partitionColumn'='id',

    'lowerBound'='0',

    'upperBound'='100',

    'numPartitions'='2'

    Two concurrent tasks are started in DLI. The execution ID of one task is greater than or equal to 0 and the ID is less than 50, and the execution ID of the other task is greater than or equal to 50 and the ID is less than 100.

    fetchsize

    Number of data records obtained in each batch during data reading. The default value is 1000. If this parameter is set to a large value, the performance is good but more memory is occupied. If this parameter is set to a large value, memory overflow may occur.

    batchsize

    Number of data records written in each batch. The default value is 1000. If this parameter is set to a large value, the performance is good but more memory is occupied. If this parameter is set to a large value, memory overflow may occur.

    truncate

    Indicates whether to clear the table without deleting the original table when overwrite is executed. The options are as follows:

    true

    false

    The default value is false, indicating that the original table is deleted and then a new table is created when the overwrite operation is performed.

    isolationLevel

    Transaction isolation level. The options are as follows:

    NONE

    READ_UNCOMMITTED

    READ_COMMITTED

    REPEATABLE_READ

    SERIALIZABLE

    The default value is READ_UNCOMMITTED.

    Figure 1 RDS cluster information
  2. Insert data
    1
    sparkSession.sql("insert into dli_to_rds values(1, 'John',24),(2, 'Bob',32)")
    
  3. Query data
    1
    2
    val dataFrame = sparkSession.sql("select * from dli_to_rds")
    dataFrame.show()
    

    Before data is inserted:

    After data is inserted:

  4. Delete the datasource connection table
    1
    sparkSession.sql("drop table dli_to_rds")
    

Connecting to Datasources Through DataFrame APIs

  1. Configure datasource connection parameters
    1
    2
    3
    4
    val url = "jdbc:mysql://to-rds-1174405057-EA1Kgo8H.datasource.com:3306"
    val username = "root"
    val password = "######"
    val dbtable = "test.customer"
    
  2. Create a DataFrame, add data, and rename fields
    1
    2
    3
    4
    var dataFrame_1 = sparkSession.createDataFrame(List((8, "Jack_1", 18)))
    val df = dataFrame_1.withColumnRenamed("_1", "id")
                        .withColumnRenamed("_2", "name")
                        .withColumnRenamed("_3", "age")
    
  3. Import data to RDS
    1
    2
    3
    4
    5
    6
    7
    8
    df.write.format("jdbc")
      .option("url", url)
      .option("dbtable", dbtable)
      .option("user", username)
      .option("password", password)
      .option("driver", "com.mysql.jdbc.Driver")
      .mode(SaveMode.Append)
      .save()
    

    The value of SaveMode can be one of the following:

    • ErrorIfExis: If the data already exists, the system throws an exception.
    • Overwrite: If the data already exists, the original data will be overwritten.
    • Append: If the data already exists, the system saves the new data.
    • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
  4. Read data from RDS
    • Method 1: read.format()
      1
      2
      3
      4
      5
      6
      7
      val jdbcDF = sparkSession.read.format("jdbc")
                      .option("url", url)
                      .option("dbtable", dbtable)
                      .option("user", username)
                      .option("password", password)
                      .option("driver", "org.postgresql.Driver")
                      .load()
      
    • Method 2: read.jdbc()
      1
      2
      3
      4
      val properties = new Properties()
      properties.put("user", username)
      properties.put("password", password)
      val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties)
      

    Before data is inserted:

    After data is inserted:

    The DateFrame read by using the read.format() or read.jdbc() method is registered as a temporary table. Then, you can use SQL statements to query data.

    1
    2
    jdbcDF.registerTempTable("customer_test")
    sparkSession.sql("select * from customer_test where id = 1").show()
    

    Query results

DataFrame-related Operations

The data created by the createDataFrame() method and the data queried by the read.format() method and the read.jdbc() method are all DataFrame objects. You can directly query a single record. (In step 4, the DataFrame data is registered as a temporary table.)

  • where

    Where conditions can be combined with filter expressions such as AND and OR. The DataFrame object after filtering is returned. The following is an example:

    1
    jdbcDF.where("id = 1 or age <=10").show()
    

  • filter

    The Filter condition can be used in the same way as Where. The DataFrame object after filtering is returned.

    1
    jdbcDF.filter("id = 1 or age <=10").show()
    

  • select

    Select is used to query the DataFrame object of the specified field. Multiple fields can be queried.

    • Example 1:
      1
      jdbcDF.select("id").show()
      

    • Example 2:
      1
      jdbcDF.select("id", "name").show()
      

    • Example 3:
      1
      jdbcDF.select("id","name").where("id<4").show()
      

  • selectExpr

    selectExpr is used to perform special processing on a field. For example, the selectExpr function can be used to change the field name. The following is an example:

    If you want to name the name field as name_test and add 1 to the value of age, run the following statement.

    1
    jdbcDF.selectExpr("id", "name as name_test", "age+1").show()
    
  • col

    Col is used to obtain a specified field. Different from Select, Col can only be used to query the column type and one field can be returned at a time. The following is an example:

    1
    val idCol = jdbcDF.col("id")
    
  • drop

    Drop is used to delete a specified field. Specify a field you need to delete (only one field can be deleted at a time), the DataFrame object that does not contain the field is returned. The following is an example:

    1
    jdbcDF.drop("id").show()
    

Submitting a Spark Job

  1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
  2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Session (Recommended) and Creating a Batch Processing Job in the Data Lake Insight API Reference.
    • When submitting a job, you need to specify a dependency module named sys.datasource.rds.
    • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
    • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Session and Creating a Batch Processing Job in the Data Lake Insight API Reference.

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel