Updated on 2024-06-13 GMT+08:00

PySpark Example Code

Development Description

  • Prerequisites

    A datasource connection has been created and bound to a queue on the DLI management console.

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Code implementation
    1. Import dependency packages.
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType
      from pyspark.sql import SparkSession
      
    2. Create a session.
      1
      sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
      
  • Connecting to data sources through DataFrame APIs
    1. Configure datasource connection parameters.
      1
      2
      3
      4
      5
      url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306"
      dbtable = "test.customer"
      user = "root"
      password = "######"
      driver = "com.mysql.jdbc.Driver"
      

      For details about the parameters, see Table 1.

    2. Set data.
      1
      dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)])
      
    3. Configure the schema.
      1
      2
      3
      schema = StructType([StructField("id", IntegerType(), False),\                     
                           StructField("name", StringType(), False),\                    
                           StructField("age", IntegerType(), False)])
      
    4. Create a DataFrame.
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    5. Save data to RDS.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame.write \   
          .format("jdbc") \   
          .option("url", url) \  
          .option("dbtable", dbtable) \  
          .option("user", user) \  
          .option("password", password) \  
          .option("driver", driver) \   
          .mode("Append") \  
          .save()
      

      The value of mode can be one of the following:

      • ErrorIfExis: If the data already exists, the system throws an exception.
      • Overwrite: If the data already exists, the original data will be overwritten.
      • Append: If the data already exists, the system saves the new data.
      • Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
    6. Read data from RDS.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      jdbcDF = sparkSession.read \ 
          .format("jdbc") \  
          .option("url", url) \  
          .option("dbtable", dbtable) \ 
          .option("user", user) \  
          .option("password", password) \ 
          .option("driver", driver) \  
          .load()
      jdbcDF.show()
      
    7. View the operation result.

  • Connecting to data sources through SQL APIs
    1. Create a table to connect to an RDS data source and set connection parameters.
      1
      2
      3
      4
      5
      6
      7
      sparkSession.sql(
          "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\
          'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\
          'dbtable'='test.customer',\
          'user'='root',\
          'password'='######',\
          'driver'='com.mysql.jdbc.Driver')")
      

      For details about the parameters for creating a table, see Table 1.

    2. Insert data.
      1
      sparkSession.sql("insert into dli_to_rds values(3,'John',24)")
      
    3. Query data.
      1
      2
      jdbcDF_after = sparkSession.sql("select * from dli_to_rds")
      jdbcDF_after.show()
      
    4. View the operation result.

  • Submitting a Spark job
    1. Upload the Python code file to DLI.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

    3. After the Spark job is created, click Execute in the upper right corner of the console to submit the job. If the message "Spark job submitted successfully." is displayed, the Spark job is successfully submitted. You can view the status and logs of the submitted job on the Spark Jobs page.
      • The queue you select for creating a Spark job is the one bound when you create the datasource connection.
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.rds when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/rds/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/rds/*

      • For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

If the following sample code is directly copied to the .py file, note that unexpected characters may exist after the backslashes (\) in the file content. You need to delete the indentations or spaces after the backslashes (\).

  • Connecting to data sources through DataFrame APIs
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
    
      # Set cross-source connection parameters.   
      url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306"
      dbtable = "test.customer"
      user = "root"
      password = "######"
      driver = "com.mysql.jdbc.Driver"
    
      # Create a DataFrame and initialize the DataFrame data.  
      dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)])
    
      # Setting schema   
      schema = StructType([StructField("id", IntegerType(), False),\          
                           StructField("name", StringType(), False),\               
                           StructField("age", IntegerType(), False)])
    
      # Create a DataFrame from RDD and schema   
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the RDS.
      dataFrame.write \    
          .format("jdbc") \     
          .option("url", url) \     
          .option("dbtable", dbtable) \   
          .option("user", user) \    
          .option("password", password) \     
          .option("driver", driver) \     
          .mode("Append") \      
          .save()
    
      # Read data  
      jdbcDF = sparkSession.read \    
          .format("jdbc") \      
          .option("url", url) \   
          .option("dbtable", dbtable) \   
          .option("user", user) \   
          .option("password", password) \    
          .option("driver", driver) \  
          .load()  
      jdbcDF.show()
    
      # close session  
      sparkSession.stop()
  • Connecting to data sources through SQL APIs
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
    
      # Create a data table for DLI - associated RDS    
      sparkSession.sql(
           "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\
           'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\
           'dbtable'='test.customer',\
           'user'='root',\
           'password'='######',\
           'driver'='com.mysql.jdbc.Driver')")
    
      # Insert data into the DLI data table   
      sparkSession.sql("insert into dli_to_rds values(3,'John',24)")
    
      # Read data from DLI data table    
      jdbcDF = sparkSession.sql("select * from dli_to_rds")   
      jdbcDF.show() 
    
      # close session  
      sparkSession.stop()