Scala Example Code

Development description

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Construct dependency information and create a Spark session.
    1. Import dependencies

      Involved Maven dependency

      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      Dependencies related to import
      1
      2
      3
      import java.util.Properties
      import org.apache.spark.sql.{Row,SparkSession}
      import org.apache.spark.sql.SaveMode
      
    2. Create a session
      1
      val sparkSession = SparkSession.builder().getOrCreate()
      
  • Connecting to datasources through SQL APIs
    1. Create a table to connect to DWS datasource
      1
      2
      3
      4
      5
      6
      sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
           'url'='jdbc:postgresql://to-dws-1174404209-cA37siB6.datasource.com:8000/postgres',
           'dbtable'='customer',
           'user'='dbadmin',
           'password'='######')")
      
      Table 1 Parameters for creating a table

      Parameter

      Description

      url

      To obtain a DWS IP address, you need to create a datasource connection first. Refer to the Data Lake Insight User Guide for more information.

      After a basic datasource connection is created, the returned IP address is used.

      After an enhanced datasource connection is created, you can use the JDBC connection string (intranet) provided by DWS or the intranet IP address and port number to connect to DWS. The format is protocol header://internal IP address:internal network port number/database name, for example: jdbc:postgresql://192.168.0.77:8000/postgres. For details about how to obtain the value, see DWS cluster information.

      NOTE:

      The DWS IP address is in the following format: protocol header://IP address:port number/database name

      Example:

      jdbc:postgresql://to-dws-1174405119-ihlUr78j.datasource.com:8000/postgres

      If you want to connect to a database created in DWS, change postgres to the corresponding database name in this connection.

      user

      Username of the DWS data warehouse.

      password

      Password of the DWS data warehouse.

      dbtable

      Tables in the PostgreSQL database.

      partitionColumn

      This parameter is used to set the numeric field used concurrently when data is read.

      NOTE:
      • The partitionColumn, lowerBound, upperBound, and numPartitions parameters must be set at the same time.
      • To improve the concurrent read performance, you are advised to use auto-increment columns.

      lowerBound

      Minimum value of a column specified by partitionColumn. The value is contained in the returned result.

      upperBound

      Maximum value of a column specified by partitionColumn. The value is not contained in the returned result.

      numPartitions

      Number of concurrent read operations.

      NOTE:

      When data is read, lowerBound and upperBound are evenly allocated to each task to obtain data. Example:

      'partitionColumn'='id',

      'lowerBound'='0',

      'upperBound'='100',

      'numPartitions'='2'

      Two concurrent tasks are started in DLI. The execution ID of one task is greater than or equal to 0 and the ID is less than 50, and the execution ID of the other task is greater than or equal to 50 and the ID is less than 100.

      fetchsize

      Number of data records obtained in each batch during data reading. The default value is 1000. If this parameter is set to a large value, the performance is good but more memory is occupied. If this parameter is set to a large value, memory overflow may occur.

      batchsize

      Number of data records written in each batch. The default value is 1000. If this parameter is set to a large value, the performance is good but more memory is occupied. If this parameter is set to a large value, memory overflow may occur.

      truncate

      Indicates whether to clear the table without deleting the original table when overwrite is executed. The options are as follows:

      • true
      • false

      The default value is false, indicating that the original table is deleted and then a new table is created when the overwrite operation is performed.

      isolationLevel

      Transaction isolation level. The options are as follows:

      • NONE
      • READ_UNCOMMITTED
      • READ_COMMITTED
      • REPEATABLE_READ
      • SERIALIZABLE

      The default value is READ_UNCOMMITTED.

      Figure 1 DWS cluster information
    2. Insert data
      1
      sparkSession.sql("insert into dli_to_dws values(1, 'John',24),(2, 'Bob',32)")
      
    3. Query data
      1
      2
      val dataFrame = sparkSession.sql("select * from dli_to_dws")
      dataFrame.show()
      

      Before data is inserted:

      After data is inserted:

    4. Delete the datasource connection table
      1
      sparkSession.sql("drop table dli_to_dws")
      
  • Connecting to datasources through DataFrame APIs
    1. Configure datasource connection
      1
      2
      3
      4
      val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres"
      val username = "dbadmin"
      val password = "######"
      val dbtable = "customer"
      
    2. Create a DataFrame, add data, and rename fields
      1
      2
      3
      4
      var dataFrame_1 = sparkSession.createDataFrame(List((8, "Jack_1", 18)))
      val df = dataFrame_1.withColumnRenamed("_1", "id")
                          .withColumnRenamed("_2", "name")
                          .withColumnRenamed("_3", "age")
      
    3. Import data to DWS
      1
      2
      3
      4
      5
      6
      7
      df.write.format("jdbc")
        .option("url", url)
        .option("dbtable", dbtable)
        .option("user", username)
        .option("password", password)
        .mode(SaveMode.Append)
        .save()
      

      The value of SaveMode can be one of the following:

      • ErrorIfExis: If the data already exists, the system throws an exception.
      • Overwrite: If the data already exists, the original data will be overwritten.
      • Append: If the data already exists, the system saves the new data.
      • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
    4. Read data from DWS
      • Method 1: read.format()
        1
        2
        3
        4
        5
        6
        val jdbcDF = sparkSession.read.format("jdbc")
                         .option("url", url)
                         .option("dbtable", dbtable)
                         .option("user", username)
                         .option("password", password)
                         .load()
        
      • Method 2: read.jdbc()
        1
        2
        3
        4
        val properties = new Properties()
         properties.put("user", username)
         properties.put("password", password)
         val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties)
        

      Before data is inserted:

      After data is inserted:

      The DataFrame read by using the read.format() or read.jdbc() method is registered as a temporary table. Then, you can use SQL statements to query data.

      1
      2
      jdbcDF.registerTempTable("customer_test")
       sparkSession.sql("select * from customer_test where id = 1").show()
      

      Query results

  • DataFrame-related operations

    The data created by the createDataFrame() method and the data queried by the read.format() method and the read.jdbc() method are all DataFrame objects. You can directly query a single record. (In step 4, the DataFrame data is registered as a temporary table.)

    • where

      Where conditions can be combined with filter expressions such as AND and OR. The DataFrame object after filtering is returned. The following is an example:

      1
      jdbcDF.where("id = 1 or age <=10").show()
      

    • filter

      The Filter condition can be used in the same way as Where. After you input the Filter criteria expression, the result after the filter is returned. The following is an example:

      1
      jdbcDF.filter("id = 1 or age <=10").show()
      

    • select

      Select is used to query the DataFrame object of the specified field. Multiple fields can be queried.

      • Example 1:
        1
        jdbcDF.select("id").show()
        

      • Example 2:
        1
        jdbcDF.select("id", "name").show()
        

      • Example 3:
        1
        jdbcDF.select("id","name").where("id<4").show()
        

    • selectExpr

      selectExpr is used to perform special processing on a field. For example, the selectExpr function can be used to change the field name. The following is an example:

      If you want to name the name field as name_test and add 1 to the value of age, run the following statement.

      1
      jdbcDF.selectExpr("id", "name as name_test", "age+1").show()
      
    • col

      Col is used to obtain a specified field. Different from Select, Col can only be used to query the column type and one field can be returned at a time. The following is an example:

      1
      val idCol = jdbcDF.col("id")
      
    • drop

      Drop is used to delete a specified field. Specify a field you need to delete (only one field can be deleted at a time), the DataFrame object that does not contain the field is returned. The following is an example:

      1
      jdbcDF.drop("id").show()
      

  • Submitting a Spark job
    1. Generate a JAR package based on the code and upload the package to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Maven dependency
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object Test_SQL_DWS {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
        // Create a data table for DLI-associated DWS
        sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
    	  'url'='jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres',
    	  'dbtable'='customer',
    	  'user'='dbadmin',
    	  'password'='######')")
    
        //*****************************SQL model***********************************
        //Insert data into the DLI data table
        sparkSession.sql("insert into dli_to_dws values(1,'John',24),(2,'Bob',32)")
      
        //Read data from DLI data table
        val dataFrame = sparkSession.sql("select * from dli_to_dws")
        dataFrame.show()
      
        //drop table
        sparkSession.sql("drop table dli_to_dws")
    
        sparkSession.close()
      }
    }
    
  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SaveMode
    
    object Test_SQL_DWS {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        //*****************************DataFrame model***********************************
        // Set the connection configuration parameters. Contains url, username, password, dbtable.
        val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres"
        val username = "dbadmin"
        val password = "######"
        val dbtable = "customer"
    
        //Create a DataFrame and initialize the DataFrame data.
        var dataFrame_1 = sparkSession.createDataFrame(List((1, "Jack", 18)))
     
        //Rename the fields set by the createDataFrame() method.
        val df = dataFrame_1.withColumnRenamed("_1", "id")
    	                .withColumnRenamed("_2", "name")
    	                .withColumnRenamed("_3", "age")
    
        //Write data to the dws_table_1 table
        df.write.format("jdbc")
          .option("url", url) 
          .option("dbtable", dbtable) 
          .option("user", username) 
          .option("password", password) 
          .mode(SaveMode.Append) 
          .save()
    
        // DataFrame object for data manipulation
        //Filter users with id=1
        var newDF = df.filter("id!=1")
        newDF.show()
      
        // Filter the id column data
        var newDF_1 = df.drop("id")
        newDF_1.show()
    
        // Read the data of the customer table in the RDS database
        //Way one: Read data from DWS using read.format()
        val jdbcDF = sparkSession.read.format("jdbc")
                        .option("url", url)
                        .option("dbtable", dbtable)
                        .option("user", username)
                        .option("password", password)
                        .option("driver", "org.postgresql.Driver")
                        .load()
        //Way two: Read data from DWS using read.jdbc()
        val properties = new Properties()
        properties.put("user", username)
        properties.put("password", password)
        val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties)
    
        /**
         * Register the dateFrame read by read.format() or read.jdbc() as a temporary table, and query the data 
         * using the sql statement.
         */
        jdbcDF.registerTempTable("customer_test")
        val result = sparkSession.sql("select * from customer_test where id = 1")
        result.show()
    
        sparkSession.close()
      }
    }