Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with RDS (By Scala)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00

Maven Dependency

1
2
3
4
5
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>

Connecting to Datasources Through SQL APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Properties
import org.apache.spark.sql.SparkSession

object Test_SQL_RDS {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()
  
    // Create a data table for DLI-associated RDS
    sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (
   'url'='jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306,
	  'dbtable'='test.customer',
	  'user'='root',
	  'password'='######',
          'driver'='com.mysql.jdbc.Driver')")

    //*****************************SQL model***********************************
    //Insert data into the DLI data table
    sparkSession.sql("insert into dli_to_rds values(1,'John',24),(2,'Bob',32)")
  
    //Read data from DLI data table
    val dataFrame = sparkSession.sql("select * from dli_to_rds")
    dataFrame.show()
  
    //drop table
    sparkSession.sql("drop table dli_to_rds")

    sparkSession.close()
  }
}

Connecting to Datasources Through DataFrame APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object Test_SQL_RDS {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()

    //*****************************DataFrame model***********************************
    // Set the connection configuration parameters. Contains url, username, password, dbtable.
    val url = "jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306"
    val username = "root"
    val password = "######"
    val dbtable = "test.customer"

    // Create a DataFrame and initialize the DataFrame data.
    var dataFrame_1 = sparkSession.createDataFrame(List((1, "Jack", 18)))
  
    // Rename the fields set by the createDataFrame() method.
    val df = dataFrame_1.withColumnRenamed("_1", "id")
	                .withColumnRenamed("_2", "name")
	                .withColumnRenamed("_3", "age")

    // Write data to the rds_table_1 table
    df.write.format("jdbc")
      .option("url", url)
      .option("dbtable", dbtable)
      .option("user", username)
      .option("password", password)
      .option("driver", "com.mysql.jdbc.Driver")
      .mode(SaveMode.Append)
      .save()

    // DataFrame object for data manipulation
    //Filter users with id=1
    var newDF = df.filter("id!=1")
    newDF.show()
  
    // Filter the id column data
    var newDF_1 = df.drop("id")
    newDF_1.show()

    // Read the data of the customer table in the RDS database
    // Way one: Read data from RDS using read.format()
    val jdbcDF = sparkSession.read.format("jdbc")
                    .option("url", url)
                    .option("dbtable", dbtable)
                    .option("user", username)
                    .option("password", password)
                    .option("driver", "com.mysql.jdbc.Driver")
                    .load()
    // Way two: Read data from RDS using read.jdbc()
    val properties = new Properties()
    properties.put("user", username)
    properties.put("password", password)
    val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties)

    /**
     * Register the dateFrame read by read.format() or read.jdbc() as a temporary table, and query the data 
     * using the sql statement.
     */
    jdbcDF.registerTempTable("customer_test")
    val result = sparkSession.sql("select * from customer_test where id = 1")
    result.show()

    sparkSession.close()
  }
}

DataFrame-related Operations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  // The where() method uses " and" and "or" for condition filters, returning filtered DataFrame objects
  jdbcDF.where("id = 1 or age <=10").show()

  // The filter() method is used in the same way as the where() method.
  jdbcDF.filter("id = 1 or age <=10").show()

  // The select() method passes multiple arguments and returns the DataFrame object of the specified field.
  jdbcDF.select("id").show()
  jdbcDF.select("id", "name").show()
  jdbcDF.select("id","name").where("id<4").show()

  /**
   * The selectExpr() method implements special handling of fields, such as renaming, increasing or 
   * decreasing data values.
   */
  jdbcDF.selectExpr("id", "name as name_test", "age+1").show()

  // The col() method gets a specified field each time, and the return type is a Column type.
  val idCol = jdbcDF.col("id")

  /**
   * The drop() method returns a DataFrame object that does not contain deleted fields, and only one field 
   * can be deleted at a time.
   */
  jdbcDF.drop("id").show()

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel