Updated on 2026-03-10 GMT+08:00

PySpark Example Code

Scenario

This section provides PySpark example code that demonstrates how to use a Spark job to access data from the DWS data source.

A datasource connection has been created and bound to a queue on the DLI management console. For details, see Enhanced Datasource Connections.

Hard coding passwords or storing them in code in plaintext poses significant security risks. You are advised to store them in encrypted form in configuration files or environment variables and decrypt them when needed to ensure security.

You can also use DEW to manage access credentials for data sources.

Preparations

  1. Import dependency packages.
    1
    2
    3
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
  2. Create a session.
    1
    sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    

Accessing a Data Source Using a DataFrame API

  1. Set connection parameters.
    1
    2
    3
    4
    5
    url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
    dbtable = "customer"
    user = "dbadmin"
    password = "######"
    driver = "org.postgresql.Driver"
    
  2. Set data.
    1
    dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
    
  3. Configure the schema.
    1
    2
    3
    schema = StructType([StructField("id", IntegerType(), False),\                
                         StructField("name", StringType(), False),\            
                         StructField("age", IntegerType(), False)])
    
  4. Create a DataFrame.
    1
    dataFrame = sparkSession.createDataFrame(dataList, schema)
    
  5. Save the data to DWS.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    dataFrame.write \   
        .format("jdbc") \  
        .option("url", url) \  
        .option("dbtable", dbtable) \  
        .option("user", user) \ 
        .option("password", password) \ 
        .option("driver", driver) \ 
        .mode("Overwrite") \  
        .save()
    

    The options of mode can be one of the following:

    • ErrorIfExists: If the data already exists, the system throws an exception.
    • Overwrite: If the data already exists, the original data will be overwritten.
    • Append: If the data already exists, the system saves the new data.
    • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
  6. Read data from DWS.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    jdbcDF = sparkSession.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", driver) \
        .load()
    jdbcDF.show()
    
  7. View the operation result.

Accessing a Data Source Using a SQL API

  1. Create a table to connect to a DWS data source.
    1
    2
    3
    4
    5
    6
    7
    sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
        'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\
        'dbtable'='customer',\
        'user'='dbadmin',\
        'password'='######',\
        'driver'='org.postgresql.Driver')")
    

    For details about table creation parameters, see Table 1.

  2. Insert data.
    1
    sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
    
  3. Query data.
    1
    jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
    
  4. View the operation result.

Submitting a Spark Job

  1. Upload the Python code file to the OBS bucket.
  2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.
    • For Spark 2.3.2 (soon to be taken offline) or 2.4.5, set Module to sys.datasource.dws when submitting a job.
    • If the Spark version is 3.1.1 or later, you do not need to select a module. Configure Spark parameters (--conf).

      spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*

      spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*

    • For how to submit a job on the console, see table "Parameters for selecting dependency resources" in Creating a Spark Job.
    • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Connecting to data sources through DataFrame APIs

    Hard coding passwords or storing them in code in plaintext poses significant security risks. You are advised to store them in encrypted form in configuration files or environment variables and decrypt them when needed to ensure security.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.   
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    
      # Set cross-source connection parameters  
      url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
      dbtable = "customer" 
      user = "dbadmin"
      password = "######"
      driver = "org.postgresql.Driver"
    
      # Create a DataFrame and initialize the DataFrame data.   
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
    
      # Setting schema   
      schema = StructType([StructField("id", IntegerType(), False),\     
                           StructField("name", StringType(), False),\    
                           StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema   
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the DWS table  
      dataFrame.write \ 
          .format("jdbc") \    
          .option("url", url) \  
          .option("dbtable", dbtable) \    
          .option("user", user) \    
          .option("password", password) \   
          .option("driver", driver) \     
          .mode("Overwrite") \   
          .save()
    
      # Read data   
      jdbcDF = sparkSession.read \  
          .format("jdbc") \     
          .option("url", url) \  
          .option("dbtable", dbtable) \     
          .option("user", user) \     
          .option("password", password) \   
          .option("driver", driver) \  
          .load()  
      jdbcDF.show()
    
      # close session  
      sparkSession.stop()
    
  • Connecting to data sources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session. 
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    
      # Create a data table for DLI - associated DWS 
      sparkSession.sql(
          "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (\
          'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\
          'dbtable'='customer',\
          'user'='dbadmin',\
          'password'='######',\
          'driver'='org.postgresql.Driver')")
    
      # Insert data into the DLI data table  
      sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
    
      # Read data from DLI data table  
      jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
    
      # close session  
      sparkSession.stop()