Help Center > > Developer Guide> Developing a DLI Datasource Connection Using a Spark Job> Interconnecting with DWS (By Scala)> Complete Example Code

Complete Example Code

Updated at: Mar 17, 2020 GMT+08:00

Maven Dependency

1
2
3
4
5
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>

Connecting to Datasources Through SQL APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.Properties
import org.apache.spark.sql.SparkSession

object Test_SQL_DWS {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()
    // Create a data table for DLI-associated DWS
    sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
	  'url'='jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres',
	  'dbtable'='customer',
	  'user'='dbadmin',
	  'password'='######')")

    //*****************************SQL model***********************************
    //Insert data into the DLI data table
    sparkSession.sql("insert into dli_to_dws values(1,'John',24),(2,'Bob',32)")
  
    //Read data from DLI data table
    val dataFrame = sparkSession.sql("select * from dli_to_dws")
    dataFrame.show()
  
    //drop table
    sparkSession.sql("drop table dli_to_dws")

    sparkSession.close()
  }
}

Connecting to Datasources Through DataFrame APIs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object Test_SQL_DWS {
  def main(args: Array[String]): Unit = {
    // Create a SparkSession session.
    val sparkSession = SparkSession.builder().getOrCreate()

    //*****************************DataFrame model***********************************
    // Set the connection configuration parameters. Contains url, username, password, dbtable.
    val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres"
    val username = "dbadmin"
    val password = "######"
    val dbtable = "customer"

    //Create a DataFrame and initialize the DataFrame data.
    var dataFrame_1 = sparkSession.createDataFrame(List((1, "Jack", 18)))
 
    //Rename the fields set by the createDataFrame() method.
    val df = dataFrame_1.withColumnRenamed("_1", "id")
	                .withColumnRenamed("_2", "name")
	                .withColumnRenamed("_3", "age")

    //Write data to the dws_table_1 table
    df.write.format("jdbc")
      .option("url", url) 
      .option("dbtable", dbtable) 
      .option("user", username) 
      .option("password", password) 
      .mode(SaveMode.Append) 
      .save()

    // DataFrame object for data manipulation
    //Filter users with id=1
    var newDF = df.filter("id!=1")
    newDF.show()
  
    // Filter the id column data
    var newDF_1 = df.drop("id")
    newDF_1.show()

    // Read the data of the customer table in the RDS database
    //Way one: Read data from DWS using read.format()
    val jdbcDF = sparkSession.read.format("jdbc")
                    .option("url", url)
                    .option("dbtable", dbtable)
                    .option("user", username)
                    .option("password", password)
                    .option("driver", "org.postgresql.Driver")
                    .load()
    //Way two: Read data from DWS using read.jdbc()
    val properties = new Properties()
    properties.put("user", username)
    properties.put("password", password)
    val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties)

    /**
     * Register the dateFrame read by read.format() or read.jdbc() as a temporary table, and query the data 
     * using the sql statement.
     */
    jdbcDF.registerTempTable("customer_test")
    val result = sparkSession.sql("select * from customer_test where id = 1")
    result.show()

    sparkSession.close()
  }
}

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel