Updated on 2024-06-13 GMT+08:00

PySpark Example Code

Scenario

This section provides PySpark example code that demonstrates how to use a Spark job to access data from the GaussDB(DWS) data source.

A datasource connection has been created and bound to a queue on the DLI management console.

Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

Preparations

  1. Import dependency packages.
    1
    2
    3
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
  2. Create a session.
    1
    sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    

Accessing a Data Source Using a DataFrame API

  1. Set connection parameters.
    1
    2
    3
    4
    5
    url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
    dbtable = "customer"
    user = "dbadmin"
    password = "######"
    driver = "org.postgresql.Driver"
    
  2. Set data.
    1
    dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
    
  3. Configure the schema.
    1
    2
    3
    schema = StructType([StructField("id", IntegerType(), False),\                
                         StructField("name", StringType(), False),\            
                         StructField("age", IntegerType(), False)])
    
  4. Create a DataFrame.
    1
    dataFrame = sparkSession.createDataFrame(dataList, schema)
    
  5. Save the data to GaussDB(DWS).
    1
    2
    3
    4
    5
    6
    7
    8
    9
    dataFrame.write \   
        .format("jdbc") \  
        .option("url", url) \  
        .option("dbtable", dbtable) \  
        .option("user", user) \ 
        .option("password", password) \ 
        .option("driver", driver) \ 
        .mode("Overwrite") \  
        .save()
    

    The options of mode can be one of the following:

    • ErrorIfExis: If the data already exists, the system throws an exception.
    • Overwrite: If the data already exists, the original data will be overwritten.
    • Append: If the data already exists, the system saves the new data.
    • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
  6. Read data from GaussDB(DWS).
    1
    2
    3
    4
    5
    6
    7
    8
    9
    jdbcDF = sparkSession.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", driver) \
        .load()
    jdbcDF.show()
    
  7. View the operation result.

Accessing a Data Source Using a SQL API

  1. Create a table to connect to a GaussDB(DWS) data source.
    1
    2
    3
    4
    5
    6
    7
    sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
        'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\
        'dbtable'='customer',\
        'user'='dbadmin',\
        'password'='######',\
        'driver'='org.postgresql.Driver')")
    

    For details about table creation parameters, see Table 1.

  2. Insert data.
    1
    sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
    
  3. Query data.
    1
    jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
    
  4. View the operation result.

Submitting a Spark Job

  1. Upload the Python code file to DLI.
  2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.
    • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, set Module to sys.datasource.hbase when you submit a job.
    • If the Spark version is 3.1.1, you do not need to select a module. Set Spark parameters (--conf).

      spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*

      spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*

    • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
    • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Connecting to data sources through DataFrame APIs

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.   
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    
      # Set cross-source connection parameters  
      url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
      dbtable = "customer" 
      user = "dbadmin"
      password = "######"
      driver = "org.postgresql.Driver"
    
      # Create a DataFrame and initialize the DataFrame data.   
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
    
      # Setting schema   
      schema = StructType([StructField("id", IntegerType(), False),\     
                           StructField("name", StringType(), False),\    
                           StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema   
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the DWS table  
      dataFrame.write \ 
          .format("jdbc") \    
          .option("url", url) \  
          .option("dbtable", dbtable) \    
          .option("user", user) \    
          .option("password", password) \   
          .option("driver", driver) \     
          .mode("Overwrite") \   
          .save()
    
      # Read data   
      jdbcDF = sparkSession.read \  
          .format("jdbc") \     
          .option("url", url) \  
          .option("dbtable", dbtable) \     
          .option("user", user) \     
          .option("password", password) \   
          .option("driver", driver) \  
          .load()  
      jdbcDF.show()
    
      # close session  
      sparkSession.stop()
    
  • Connecting to data sources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session. 
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    
      # Create a data table for DLI - associated GaussDB(DWS)
      sparkSession.sql(
          "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (\
          'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\
          'dbtable'='customer',\
          'user'='dbadmin',\
          'password'='######',\
          'driver'='org.postgresql.Driver')")
    
      # Insert data into the DLI data table  
      sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
    
      # Read data from DLI data table  
      jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
    
      # close session  
      sparkSession.stop()