PySpark Example Code
Scenario
This section provides PySpark example code that demonstrates how to use a Spark job to access data from the GaussDB(DWS) data source.
A datasource connection has been created and bound to a queue on the DLI management console. For details, see Enhanced Datasource Connections.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
Preparations
- Import dependency packages.
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
- Create a session.
1
sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
Accessing a Data Source Using a DataFrame API
- Set connection parameters.
1 2 3 4 5
url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver"
- Set data.
1
dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
- Configure the schema.
1 2 3
schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)])
- Create a DataFrame.
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- Save the data to GaussDB(DWS).
1 2 3 4 5 6 7 8 9
dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save()
The options of mode can be one of the following:
- ErrorIfExis: If the data already exists, the system throws an exception.
- Overwrite: If the data already exists, the original data will be overwritten.
- Append: If the data already exists, the system saves the new data.
- Ignore: If the data already exists, no operation is required. This is similar to the SQL statement CREATE TABLE IF NOT EXISTS.
- Read data from GaussDB(DWS).
1 2 3 4 5 6 7 8 9
jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show()
- View the operation result.
Accessing a Data Source Using a SQL API
- Create a table to connect to a GaussDB(DWS) data source.
1 2 3 4 5 6 7
sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ( 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')")
For details about table creation parameters, see Table 1.
- Insert data.
1
sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
- Query data.
1
jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
- View the operation result.
Submitting a Spark Job
- Upload the Python code file to DLI. For details about console operations, see Creating a Package. For API operations, see Uploading a Package Group.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job. For details about console operations, see Creating a Spark Job. For API operations, see Creating a Batch Processing Job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, set Module to sys.datasource.hbase when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Set Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
Complete Example Code
- Connecting to data sources through DataFrame APIs
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Set cross-source connection parameters url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the DWS table dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop()
- Connecting to data sources through SQL APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Create a data table for DLI - associated GaussDB(DWS) sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (\ 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(2,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_dws").show() # close session sparkSession.stop()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot