PySpark Example Code

Development description

  • Prerequisites

    A datasource connection has been created on the DLI management console. For details, see Data Lake Insight User Guide.

  • Code implementation
    1. Dependencies related to import
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
    2. Create a session
      1
      sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
      
  • Connecting to datasources through DataFrame APIs
    1. Configure datasource connection parameters
      1
      2
      3
      4
      5
      url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306"
      dbtable = "test.customer"
      user = "root"
      password = "######"
      driver = "com.mysql.jdbc.Driver"
      
    2. Configure data
      1
      dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)])
      
    3. Configure the schema
      1
      2
      3
      schema = StructType([StructField("id", IntegerType(), False),                     
                           StructField("name", StringType(), False),                    
                           StructField("age", IntegerType(), False)])
      
    4. Create a DataFrame
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    5. Save the data to RDS
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame.write \   
          .format("jdbc") \   
          .option("url", url) \  
          .option("dbtable", dbtable) \  
          .option("user", user) \  
          .option("password", password) \  
          .option("driver", driver) \   
          .mode("Append") \  
          .save()
      

      The value of mode can be one of the following:

      • ErrorIfExis: If the data already exists, the system throws an exception.
      • Overwrite: If the data already exists, the original data will be overwritten.
      • Append: If the data already exists, the system saves the new data.
      • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
    6. Read data from RDS
      1
      2
      3
      4
      5
      6
      7
      8
      9
      jdbcDF = sparkSession.read \ 
          .format("jdbc") \  
          .option("url", url) \  
          .option("dbtable", dbtable) \ 
          .option("user", user) \  
          .option("password", password) \ 
          .option("driver", driver) \  
          .load()
      jdbcDF.show()
      
    7. Operation result

  • Connecting to datasources through SQL APIs
    1. Create a table to connect to RDS datasource
      1
      2
      3
      4
      5
      6
      7
      sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (
          'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',
          'dbtable'='test.customer',
          'user'='root',
          'password'='######',
          'driver'='com.mysql.jdbc.Driver')")
      

      For details about table creation parameters, see Table 1.

    2. Insert data
      1
      sparkSession.sql("insert into dli_to_rds values(3,'John',24)")
      
    3. Query data
      1
      2
      jdbcDF_after = sparkSession.sql("select * from dli_to_rds")
      jdbcDF_after.show()
      
    4. Operation result

  • Submitting a Spark Job
    1. Upload the Python code file to DLI. For details about console operations, see the Data Lake Insight User Guide. For API references, see Uploading a Resource Package in the Data Lake Insight API Reference.
    2. In the Spark job editor, select the corresponding dependency and execute the Spark job. For details about console operations, see the Data Lake Insight User Guide. For API references, see Creating a Batch Processing Job in the Data Lake Insight API Reference.
      • When submitting a job, you need to specify a dependency module named sys.datasource.rds.
      • For details about how to submit a job on the console, see Table 6-Dependency Resources parameter description in the Data Lake Insight User Guide.
      • For details about how to submit a job through an API, see the modules parameter in Table 2-Request parameter description of Creating a Batch Processing Job in the Data Lake Insight API Reference.

Complete example code

  • Connecting to datasources through DataFrame APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
    
      # Set cross-source connection parameters.   
      url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306"
      dbtable = "test.customer"
      user = "root"
      password = "######"
      driver = "com.mysql.jdbc.Driver"
    
      # Create a DataFrame and initialize the DataFrame data.  
      dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)])
    
      # Setting schema   
      schema = StructType([StructField("id", IntegerType(), False),          
                           StructField("name", StringType(), False),               
                           StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema   
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the RDS.
      dataFrame.write \    
          .format("jdbc") \     
          .option("url", url) \     
          .option("dbtable", dbtable) \   
          .option("user", user) \    
          .option("password", password) \     
          .option("driver", driver) \     
          .mode("Append") \      
          .save()
    
      # Read data  
      jdbcDF = sparkSession.read \    
          .format("jdbc") \      
          .option("url", url) \   
          .option("dbtable", dbtable) \   
          .option("user", user) \   
          .option("password", password) \    
          .option("driver", driver) \  
          .load()  
      jdbcDF.show()
    
      # close session  
      sparkSession.stop()
    
  • Connecting to datasources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
    
      # Createa data table for DLI - associated RDS    
      sparkSession.sql(
        "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (
           'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',
           'dbtable'='test.customer',
           'user'='root',
           'password'='######',
           'driver'='com.mysql.jdbc.Driver')")
    
      # Insert data into the DLI data table   
      sparkSession.sql("insert into dli_to_rds values(3,'John',24)")
      
      # Read data from DLI data table    
      jdbcDF = sparkSession.sql("select * from dli_to_rds")   
      jdbcDF.show() 
     
      # close session  
      sparkSession.stop()