PySpark Example Code

Development description

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Code implementation
    1. Dependencies related to import
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
    2. Create a session
      1
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
      
  • Connecting to datasources through DataFrame APIs
    1. Configure datasource connection parameters
      1
      2
      3
      4
      5
      url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
      dbtable = "customer"
      user = "dbadmin"
      password = "######"
      driver = "org.postgresql.Driver"
      
    2. Configure data
      1
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
      
    3. Configure the schema
      1
      2
      3
      schema = StructType([StructField("id", IntegerType(), False),                
                           StructField("name", StringType(), False),            
                           StructField("age", IntegerType(), False)])
      
    4. Create a DataFrame
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    5. Save the data to DWS
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame.write \   
          .format("jdbc") \  
          .option("url", url) \  
          .option("dbtable", dbtable) \  
          .option("user", user) \ 
          .option("password", password) \ 
          .option("driver", driver) \ 
          .mode("Overwrite") \  
          .save()
      

      The value of mode can be one of the following:

      • ErrorIfExis: If the data already exists, the system throws an exception.
      • Overwrite: If the data already exists, the original data will be overwritten.
      • Append: If the data already exists, the system saves the new data.
      • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
    6. Read data from DWS
      1
      2
      3
      4
      5
      6
      7
      8
      9
      jdbcDF = sparkSession.read \
          .format("jdbc") \
          .option("url", url) \
          .option("dbtable", dbtable) \
          .option("user", user) \
          .option("password", password) \
          .option("driver", driver) \
          .load()
      jdbcDF.show()
      
    7. Operation result

  • Connecting to datasources through SQL APIs
    1. Create a table to connect to DWS datasource
      1
      2
      3
      4
      5
      6
      7
      sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
          'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',
          'dbtable'='customer',
          'user'='dbadmin',
          'password'='######',
          'driver'='org.postgresql.Driver')")
      

      For details about table creation parameters, see Table 1.

    2. Insert data
      1
      sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
      
    3. Query data
      1
      jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
      
    4. Operation result

  • Submitting a Spark Job
    1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.dws.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.   
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    
      # Set cross-source connection parameters  
      url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
      dbtable = "customer" 
      user = "dbadmin"
      password = "######"
      driver = "org.postgresql.Driver"
    
      # Create a DataFrame and initialize the DataFrame data.   
      dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
    
      # Setting schema   
      schema = StructType([StructField("id", IntegerType(), False),     
                           StructField("name", StringType(), False),    
                           StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema   
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the DWS table  
      dataFrame.write \ 
          .format("jdbc") \    
          .option("url", url) \  
          .option("dbtable", dbtable) \    
          .option("user", user) \    
          .option("password", password) \   
          .option("driver", driver) \     
          .mode("Overwrite") \   
          .save()
    
      # Read data   
      jdbcDF = sparkSession.read \  
          .format("jdbc") \     
          .option("url", url) \  
          .option("dbtable", dbtable) \     
          .option("user", user) \     
          .option("password", password) \   
          .option("driver", driver) \  
          .load()  
      jdbcDF.show()
    
      # close session  
      sparkSession.stop()
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session. 
      sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
    
      # Createa data table for DLI - associated DWS 
      sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (
          'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',
          'dbtable'='customer',
          'user'='dbadmin',
          'password'='######',
          'driver'='org.postgresql.Driver')")
    
      # Insert data into the DLI data table  
      sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
    
      # Read data from DLI data table  
      jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
    
      # close session  
      sparkSession.stop()